Last active
September 21, 2025 00:09
-
-
Save VeylanSolmira/25e0c4e39d9beeeafe96fabb7e083fcd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass, field | |
| @dataclass | |
| class Transaction: | |
| transaction_type: str | |
| amount: int | |
| priority: int = 1 | |
| transaction_id: int | None = None | |
| timestamp: int | None = None | |
| @dataclass | |
| class Account: | |
| account_id: str | |
| # balance: int = 0 | |
| transactions: list[Transaction] = field(default_factory=list) | |
| # transfer_out: int = 0 | |
| # @property | |
| def get_balance(self, timestamp) -> int: | |
| balance = 0 | |
| # print(f"timestamp: {timestamp}, account_id {self.account_id} {sorted(self.transactions, key=lambda x: x.timestamp)}") | |
| for transaction in self.transactions: | |
| if timestamp < transaction.timestamp: | |
| continue | |
| if transaction.transaction_type in ["deposit", "transfer_in"]: | |
| balance += transaction.amount | |
| elif transaction.transaction_type in ["transfer_out", "payment"]: | |
| balance -= transaction.amount | |
| return balance | |
| class BankAccount: | |
| MILLISECONDS_IN_1_DAY = 24 * 60 * 60 * 1000 | |
| def __init__(self): | |
| self.accounts: dict = {} | |
| self.payment_num: int = 1 | |
| # Level 1 | |
| def create_account(self, timestamp: int, account_id: str) -> bool: | |
| """— should create a new account with the given identifier if it doesnt already exist. Returns True if the account was successfully created or False if an account with account_id already exists.""" | |
| if account_id not in self.accounts: | |
| self.accounts[account_id] = Account(account_id) | |
| return True | |
| else: | |
| return False | |
| def deposit( | |
| self, timestamp: int, account_id: str, amount: int | |
| ) -> int | None: | |
| """should deposit the given amount of money to the specified account account_id. Returns the balance of the account after the operation has been processed. If the specified account doesn’t exist, should return None.""" | |
| # print(timestamp, account_id, amount) | |
| if account_id not in self.accounts: | |
| return None | |
| else: | |
| # self.accounts[account_id].amount += amount | |
| self.accounts[account_id].transactions.append( | |
| Transaction("deposit", amount, timestamp=timestamp) | |
| ) | |
| return self.accounts[account_id].get_balance(timestamp) | |
| def transfer( | |
| self, | |
| timestamp: int, | |
| source_account_id: str, | |
| target_account_id: str, | |
| amount: int, | |
| ) -> int | None: | |
| """should transfer the given amount of money from account source_account_id to account target_account_id. Returns the balance of source_account_id if the transfer was successful or None otherwise. | |
| Returns None if source_account_id or target_account_id doesnt exist. | |
| Returns None if source_account_id and target_account_id are the same. | |
| Returns None if account source_account_id has insufficient funds to perform the transfer. | |
| """ | |
| if ( | |
| source_account_id not in self.accounts | |
| or target_account_id not in self.accounts | |
| ): | |
| return None | |
| if source_account_id == target_account_id: | |
| return None | |
| if self.accounts[source_account_id].get_balance(timestamp) < amount: | |
| return None | |
| # self.accounts[source_account_id].amount -= amount | |
| # self.accounts[source_account_id].transactions.append("transfer_out", amount) | |
| self.accounts[source_account_id].transactions.append( | |
| Transaction("transfer_out", amount, timestamp=timestamp) | |
| ) | |
| # self.accounts[source_account_id].transfer_out += amount | |
| # self.accounts[target_account_id].amount += amount | |
| # self.accounts[target_account_id].transactions.append("transfer_in", amount) | |
| self.accounts[target_account_id].transactions.append( | |
| Transaction("transfer_in", amount, timestamp=timestamp) | |
| ) | |
| return self.accounts[source_account_id].get_balance(timestamp) | |
| # Level 2 | |
| def top_spenders(self, timestamp: int, n: int) -> list[str]: | |
| """should return the identifiers of the top n accounts with the highest outgoing transactions - the total amount of money either transferred out of or paid/withdrawn (the pay operation will be introduced in level 3) - sorted in descending order, or in case of a tie, sorted alphabetically by account_id in ascending order. The result should be a list of strings in the following format: ["<account_id_1>(<total_outgoing_1>)", "<account_id_2>(<total_outgoing_2>)", ..., "<account_id_n>(<total_outgoing_n>)"]. | |
| If less than n accounts exist in the system, then return all their identifiers (in the described format). | |
| Cashback (an operation that will be introduced in level 3) should not be reflected in the calculations for total outgoing transactions. | |
| """ | |
| transaction_types = ["transfer_out", "payment"] | |
| results = [ | |
| ( | |
| account_id, | |
| sum( | |
| transaction.amount | |
| for transaction in account.transactions | |
| if transaction.transaction_type in transaction_types | |
| ), | |
| ) | |
| for account_id, account in self.accounts.items() | |
| ] | |
| return [ | |
| f"{account_id}({amount})" | |
| for account_id, amount in sorted( | |
| results, key=lambda x: (-x[1], x[0]) | |
| )[:n] | |
| ] | |
| # Level 3 | |
| def pay(self, timestamp: int, account_id: str, amount: int) -> str | None: | |
| """should withdraw the given amount of money from the specified account. All withdraw transactions provide a 2% cashback – 2% of the withdrawn amount (rounded down to the nearest integer) will be refunded to the account 24 hours after the withdrawal. If the withdrawal is successful (i.e., the account holds sufficient funds to withdraw the given amount), returns a string with a unique identifier for the payment transaction in this format: "payment[ordinal number of withdraws from all accounts]" — e.g., "payment1", "payment2", etc. Additional conditions: | |
| - Returns None if account_id doesn’t exist. | |
| - Returns None if account_id has insufficient funds to perform the payment. | |
| top_spenders should now also account for the total amount of money withdrawn from accounts. | |
| The waiting period for cashback is 24 hours, equal to 24 * 60 * 60 * 1000 = 86400000 milliseconds (the unit for timestamps). So, cashback will be processed at timestamp timestamp + 86400000. | |
| When it's time to process cashback for a withdrawal, the amount must be refunded to the account before any other transactions are performed at the relevant timestamp. | |
| """ | |
| if account_id not in self.accounts: | |
| return None | |
| if self.accounts[account_id].get_balance(timestamp) < amount: | |
| return None | |
| # self.accounts[account_id].amount -= amount | |
| cashback = int(amount * 0.02) | |
| self.deposit( | |
| timestamp + BankAccount.MILLISECONDS_IN_1_DAY, account_id, cashback | |
| ) | |
| # self.accounts[source_account_id].transactions.append("withdraw", amount) | |
| self.accounts[account_id].transactions.append( | |
| Transaction("payment", amount, 0, self.payment_num, timestamp) | |
| ) | |
| output = f"payment{self.payment_num}" | |
| self.payment_num += 1 | |
| return output | |
| def get_payment_status( | |
| self, timestamp: int, account_id: str, payment: str | |
| ) -> str | None: | |
| """should return the status of the payment transaction for the given payment. Specifically: | |
| Returns None if account_id doesn’t exist. | |
| Returns None if the given payment doesn’t exist for the specified account. | |
| Returns None if the payment transaction was for an account with a different identifier from account_id. | |
| Returns a string representing the payment status: "IN_PROGRESS" or "CASHBACK_RECEIVED". | |
| """ | |
| # print(timestamp, account_id, payment) | |
| if account_id not in self.accounts: | |
| # print(f'account {account_id} not found') | |
| return None | |
| # find transaction | |
| # print(f'transactions {self.accounts[account_id].transactions}') | |
| transaction = [ | |
| transaction | |
| for transaction in self.accounts[account_id].transactions | |
| if transaction.transaction_type == "payment" | |
| and transaction.transaction_id == int(payment.split("payment")[1]) | |
| ] | |
| if transaction: | |
| # print(f'transaction: {transaction}') | |
| if ( | |
| timestamp | |
| < transaction[0].timestamp + BankAccount.MILLISECONDS_IN_1_DAY | |
| ): | |
| return "IN_PROGRESS" | |
| else: | |
| return "CASHBACK_RECEIVED" | |
| else: | |
| # print(f'no transaction') | |
| return None | |
| # Level 4 | |
| def merge_accounts( | |
| self, timestamp: int, account_id_1: str, account_id_2: str | |
| ) -> bool: | |
| """should merge account_id_2 into the account_id_1. Returns True if accounts were successfully merged, or False otherwise. Specifically: | |
| Returns False if account_id_1 is equal to account_id_2. | |
| Returns False if account_id_1 or account_id_2 doesn’t exist. | |
| All pending cashback refunds for account_id_2 should still be processed, but refunded to account_id_1 instead. | |
| After the merge, it must be possible to check the status of payment transactions for account_id_2 with payment identifiers by replacing account_id_2 with account_id_1. | |
| The balance of account_id_2 should be added to the balance of account_id_1. | |
| top_spenders operations should recognize merged accounts – the total outgoing transactions for merged accounts should be the sum of all money transferred and/or withdrawn in both accounts. | |
| account_id_2 should be removed from the system after the merge.""" | |
| if account_id_1 == account_id_2: | |
| return False | |
| if ( | |
| account_id_1 not in self.accounts | |
| or account_id_2 not in self.accounts | |
| ): | |
| return False | |
| # merge | |
| self.accounts[account_id_1].transactions.extend( | |
| self.accounts[account_id_2].transactions | |
| ) | |
| self.accounts.pop(account_id_2, None) | |
| return True | |
| def get_balance( | |
| self, timestamp: int, account_id: str, time_at: int | |
| ) -> int | None: | |
| """should return the total amount of money in the account account_id at the given timestamp time_at. If the specified account did not exist at a given time time_at, returns None. | |
| If queries have been processed at timestamp time_at, get_balance must reflect the account balance after the query has been processed. | |
| If the account was merged into another account, the merged account should inherit its balance history. | |
| """ | |
| if account_id not in self.accounts: | |
| return None | |
| return self.accounts[account_id].get_balance(time_at) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass, field | |
| # from typing import Dict#, List | |
| from copy import deepcopy # , copy | |
| @dataclass | |
| class File: | |
| size: str | |
| owner: str = "admin" | |
| @dataclass | |
| class User: | |
| name: str | |
| capacity: int | |
| # capacity_used: int = 0 | |
| files: dict[str, File] = field( | |
| default_factory=dict | |
| ) # data also stored in system files, of course -- need dual updates! | |
| @property | |
| def capacity_used(self) -> int: | |
| return sum(int(file.size) for file in self.files.values()) | |
| class CloudStorage: | |
| def __init__(self): | |
| # self.storage = {} | |
| self.storage: dict[str, File] = {} | |
| # self.storage: dict[str, File] = {} | |
| self.users = {} | |
| self.users_backup = {} | |
| # Level 1 | |
| def ADD_FILE(self, name, size): | |
| if name in self.storage: | |
| return "false" | |
| else: | |
| self.storage[name] = File(size) | |
| return "true" | |
| def GET_FILE_SIZE(self, name): | |
| file = self.storage.get(name, None) | |
| return file.size if file is not None else "" | |
| def DELETE_FILE(self, name): | |
| file = self.storage.pop(name, None) | |
| if file is not None and file.owner != "admin": | |
| self.users[file.owner].files.pop(name, None) | |
| return file.size if file is not None else "" | |
| # level 2 | |
| def GET_N_LARGEST(self, prefix, n): | |
| # print(self.storage.items()) | |
| # print(prefix, n) | |
| # try: | |
| # int(n) if not isinstance(n, int) else n | |
| # except ValueError or TypeError: | |
| # return "" | |
| try: | |
| n = int(n) | |
| except (ValueError, TypeError): | |
| return "" | |
| results = [ | |
| (name, file) | |
| for name, file in sorted(self.storage.items(), key=lambda x: x[0]) | |
| if name.startswith(prefix) | |
| ] | |
| return ", ".join( | |
| [ | |
| f"{name}({file.size})" | |
| for name, file in sorted( | |
| results, key=lambda x: int(x[1].size), reverse=True | |
| )[:n] | |
| ] | |
| ) | |
| # level 3 | |
| def ADD_USER(self, user_id, capacity): | |
| if user_id in self.users: | |
| return "false" | |
| else: | |
| self.users[user_id] = User(user_id, int(capacity)) | |
| return "true" | |
| def ADD_FILE_BY(self, user_id, name, size): | |
| if name in self.storage: | |
| return "" | |
| else: | |
| if self.users[user_id].capacity < ( | |
| self.users[user_id].capacity_used + int(size) | |
| ): | |
| return "" | |
| else: | |
| self.storage[name] = File(size, user_id) | |
| # self.users[user_id].capacity_used += int(size) | |
| self.users[user_id].files[name] = File(size, user_id) | |
| # print(self.users[user_id]) | |
| return str( | |
| self.users[user_id].capacity | |
| - self.users[user_id].capacity_used | |
| ) | |
| def DELETE_USER(self, user_id): | |
| self.users.pop(user_id, None) | |
| def MERGE_USER(self, user_id_1, user_id_2): | |
| if ( | |
| user_id_1 == user_id_2 | |
| or user_id_1 not in self.users | |
| or user_id_2 not in self.users | |
| ): | |
| return "" | |
| self.users[user_id_1].capacity += self.users[user_id_2].capacity | |
| # self.users[user_id_1].capacity_used += self.users[user_id_2].capacity_used | |
| self.users[user_id_1].files.update(self.users[user_id_2].files) | |
| self.DELETE_USER(user_id_2) | |
| self.users_backup.pop(user_id_2, None) | |
| return str( | |
| self.users[user_id_1].capacity - self.users[user_id_1].capacity_used | |
| ) | |
| # Level 4 | |
| def BACKUP_USER(self, user_id): | |
| if user_id not in self.users: | |
| return "" | |
| else: | |
| self.users_backup[user_id] = deepcopy(self.users[user_id].files) | |
| return str(len(self.users_backup[user_id])) | |
| def RESTORE_USER(self, user_id): | |
| if user_id not in self.users: | |
| return "" | |
| for name in self.users[user_id].files.keys(): | |
| self.storage.pop(name, None) # deletes everything current | |
| restored_files = {} | |
| for name, file in self.users_backup.get(user_id, {}).items(): | |
| if name in self.storage: | |
| pass | |
| else: | |
| self.storage[name] = file | |
| restored_files[name] = file | |
| self.users[user_id].files = restored_files | |
| return str(len(self.users[user_id].files)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 1. Be mindful of types, especially internal numeric types for or derived from string arguments | |
| 2. embrace str representations but be very mindful of when to convert to int (or numeric) | |
| Sorting/comparison operations (GET_N_LARGEST) | |
| Capacity checks (ADD_FILE_BY) | |
| Math operations (remaining capacity) | |
| 3. we seem to have rarely received inputs of the wrong type, i.e. a non-intable string for an int, but it was in one practice and might occur -- let the tests fail and then add type checks as necessary | |
| 4. Be extremely diligent about what the actual types are of things, ESPECIALLY WHEN DOING MATH OPERATIONS ON THEM | |
| # Master guide for Anthropic's CodeSignal assessment preparation | |
| You're facing a 90-minute Python coding sprint that requires building a key-value database across 4 progressive levels, and success hinges on **speed over elegance**. With 9 years of experience, you already have the skills—what you need is the right preparation strategy and resources to code fast under pressure. | |
| ## The real structure behind Anthropic's assessment | |
| Anthropic's CodeSignal test follows the Industry Coding Framework, not traditional algorithmic problems. Based on candidate reports, you'll build a single project—typically an in-memory database or file system—that grows progressively complex across **4 sequential levels** you cannot skip. Level 1 takes 10-15 minutes for basic CRUD operations, Level 2 requires 20-30 minutes for data processing features, while Levels 3 and 4 each demand 30-60 minutes for advanced functionality like transactions and optimization. The scoring system ranges from **200-600 points**, with reports suggesting **570/600 as the borderline** for progression, though results at this score appear mixed. | |
| The assessment prioritizes practical implementation over algorithmic optimization. One candidate who achieved a perfect 600/600 score shared this critical insight: "No need to do algo optimizations, use brute force if it can save 3 minutes. Write code piece by piece, verify a piece is correct before starting next. Finishing all code, ran into bug and do debugging will eat all your time." Another key finding: the platform automatically captures your best test run even without formal submission, so focus on passing test cases rather than perfecting code quality. | |
| ## Essential GitHub repository for targeted practice | |
| The single most valuable resource is **PaulLockett/CodeSignal_Practice_Industry_Coding_Framework** on GitHub. This repository precisely mimics the real assessment with a 90-minute timer, 4 progressive levels, and Python 3.10.6 environment matching CodeSignal exactly. The practice scenario involves building a file storage system with progressive complexity—starting with basic upload/download, adding size limits and directories, implementing file operations, then handling advanced features. To use it effectively, install Python 3.10.6 locally, clone the repository, install requirements.txt, and run the test_simulation.py to practice under authentic conditions. | |
| For implementation patterns, study **Lujeni/LeakDB** for fast key-value operations using O(1) dictionary lookups and multiple storage backends. The **flow6979/In-Memory-File-System** repository demonstrates clean node-based architecture with command-line interfaces supporting mkdir, cd, ls, touch, and other file operations. For TTL implementations, examine **llorrac1/python-ttl-cache** which shows decorator patterns for easy time-based expiration. These repositories provide the exact patterns you'll need to implement quickly during the assessment. | |
| ## Speed-optimized Python implementation patterns | |
| For the key-value database component, start with this minimal viable pattern that can be extended rapidly: | |
| ```python | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| import time | |
| import copy | |
| @dataclass | |
| class TTLValue: | |
| data: any | |
| expires_at: float = None | |
| def is_expired(self): | |
| return self.expires_at and time.time() > self.expires_at | |
| class KeyValueStore: | |
| def __init__(self): | |
| self.data = {} | |
| self.transaction = None | |
| def set(self, key, value, ttl=None): | |
| expires_at = time.time() + ttl if ttl else None | |
| self.data[key] = TTLValue(value, expires_at) | |
| def get(self, key): | |
| if key not in self.data: | |
| return None | |
| item = self.data[key] | |
| if item.is_expired(): | |
| del self.data[key] | |
| return None | |
| return item.data | |
| ``` | |
| For transactions with rollback capability, use this context manager pattern that prioritizes simplicity: | |
| ```python | |
| class Transaction: | |
| def __init__(self, store): | |
| self.store = store | |
| self.backup = None | |
| def __enter__(self): | |
| self.backup = copy.deepcopy(self.store.data) | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| if exc_type is not None: | |
| self.store.data = self.backup | |
| ``` | |
| The key insight is using Python's standard library extensively—**defaultdict** eliminates key checking, **dataclasses** provide instant struct-like objects, and **copy.deepcopy** handles transaction snapshots without complex logic. | |
| ## Practice platforms that simulate real conditions | |
| Access CodeSignal's official practice environment at **app.codesignal.com/assessments/practice** with a free developer account. This provides the exact IDE, autocomplete features, and testing framework you'll encounter. Companies cannot see practice session performance, allowing unlimited attempts. The platform includes Industry Coding Framework problems with progressive difficulty matching the real assessment. | |
| For additional timed practice, use the PaulLockett GitHub repository's 90-minute simulation daily. Set strict timers and practice transitioning between levels without perfectionism. Coderbyte and HackerRank offer similar timed environments, though they lack the specific Industry Coding Framework format. Focus your practice on CodeSignal's environment to build muscle memory for their specific interface quirks and testing patterns. | |
| ## Critical speed coding techniques and shortcuts | |
| Configure your IDE for maximum efficiency before the assessment. In VSCode, enable these settings: `"python.autoComplete.addBrackets": true`, `"editor.tabCompletion": "on"`, and `"editor.quickSuggestions": true`. Master these essential shortcuts: **Ctrl+D** for selecting next occurrence enabling multi-cursor editing, **Alt+Click** for multiple cursors, and **Ctrl+/** for instant commenting. In PyCharm, **Alt+Enter** provides quick fixes and **Ctrl+Alt+L** auto-formats to PEP 8 standards. | |
| For Python-specific speed patterns, memorize these one-liners that save crucial minutes: | |
| - Dictionary comprehensions: `{k: v for k, v in zip(keys, values)}` | |
| - List filtering: `[x for x in items if condition]` | |
| - Quick sorting: `sorted(items, key=lambda x: x.attribute)` | |
| - Default handling: `value = cache.get(key, default_value)` | |
| Build a personal snippet library in GitHub Gists or your IDE containing common patterns like transaction contexts, TTL implementations, and CRUD operations. Practice typing these patterns until they become automatic—sites like SpeedCoder.net offer Python-specific typing practice with real code. | |
| ## Time management strategy for the 90-minute window | |
| Allocate your time strictly: spend 2-3 minutes skimming all levels to understand the architecture, then 5 minutes planning your approach before coding. For Level 1, aim for **10-15 minutes maximum**—implement basic operations without optimization. Level 2 should take **20-30 minutes** for data processing features. Reserve **30 minutes each for Levels 3 and 4**, accepting that Level 4 may remain incomplete. | |
| The winning strategy is sequential completion with continuous testing. Never skip ahead—each level builds on the previous. Run tests after every function implementation rather than debugging everything at the end. When you have passing tests for a level, immediately refactor to support the next level's requirements. Most importantly, submit whatever passes tests—the platform tracks all runs, and partial Level 4 completion still contributes significant points. | |
| ## Python standard library quick reference essentials | |
| Master these modules for rapid development without external dependencies: | |
| - **collections.defaultdict**: Eliminates KeyError checking with automatic default values | |
| - **collections.Counter**: Instant frequency counting and most_common() operations | |
| - **collections.deque**: Efficient queue operations with maxlen for bounded queues | |
| - **dataclasses**: Replace verbose class definitions with decorated structs | |
| - **copy.deepcopy**: Transaction snapshots and state preservation | |
| - **time.time()**: Simple TTL implementations without datetime complexity | |
| Keep the Python documentation for these modules open in a browser tab during practice. Understanding when to use defaultdict versus regular dict, or when dataclasses save time over traditional classes, can save 5-10 minutes during implementation. | |
| ## Scoring insights and what really matters | |
| The scoring system values completion over optimization. You earn base points for completing questions within modules, with bonus points only for 100% module completion. Reports indicate **540/600 represents 96th percentile**, while **570/600 shows mixed results** for progression—some candidates advance, others don't, suggesting additional factors like role-specific requirements. | |
| The platform's design philosophy emphasizes real-world development skills over algorithmic prowess. Code quality isn't directly scored—only test case passing matters. This means using brute force solutions, global variables, or simple implementations is perfectly acceptable if it saves time. One successful candidate noted: "My advice is to code very very fast and not to worry about anything else other than passing test cases. I don't think anyone will look at your code—just the score." | |
| ## Your action plan for the next two weeks | |
| Start immediately with daily 30-minute sessions in CodeSignal's practice environment to build interface familiarity. Download and complete the PaulLockett repository assessment at least three times, focusing on improving your completion speed with each attempt. Build your snippet library with the implementation patterns provided above, customizing them for quick deployment. Practice typing common Python patterns on SpeedCoder.net for 15 minutes daily to increase raw typing speed. | |
| One week before your assessment, complete three full 90-minute mock assessments under strict conditions—no external resources, single monitor, exact time limits. Track which level consumes the most time and adjust your pacing accordingly. The night before, review your snippet library and ensure your development environment matches CodeSignal's Python 3.10.6 specification exactly. | |
| Remember: this assessment rewards practical builders who can deliver working features quickly, not algorithmic virtuosos crafting elegant solutions. Your 9 years of experience provides the foundation—these resources and strategies provide the speed advantage needed to demonstrate your capabilities within the time constraint. | |
| from dataclasses import dataclass | |
| import time | |
| import bisect | |
| start_time = time.time() | |
| @dataclass | |
| class File: | |
| name: str | |
| size: int | |
| timestamp: float = 0.0 | |
| bisect.bisect(sorted_list, item) # return index of where item would be inserted to maintain sorted order | |
| bisect.insort(sorted_list, item) # insert item into sorted list | |
| # Only add keys that don't exist | |
| for key, value in dict2.items(): | |
| if key not in dict1: | |
| dict1[key] = value | |
| # Or using dict comprehension with union | |
| dict1 = {**dict1, **dict2} # dict2 still overwrites | |
| dict1 = {**dict2, **dict1} # dict1 values preserved | |
| sorted() - ascending by default | |
| dict.get(key, default) - returns default if key missing | |
| ''.join(list) - concatenate strings | |
| list[:] - shallow copy | |
| copy.deepcopy() - deep copy for nested structures | |
| try: | |
| n = int(n) | |
| except (ValueError, TypeError): | |
| return "" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import defaultdict | |
| class FileManager: | |
| def init(self): | |
| self.files = defaultdict(int) | |
| def FILE_UPLOAD(self, file_name, size): | |
| """Upload the file to the remote storage server. | |
| If a file with the same name already exists on the server, it throws a runtime exception. | |
| """ | |
| if file_name in self.files: | |
| raise RuntimeError(f"File {file_name} already exists") | |
| self.files[file_name] = size | |
| def FILE_GET(self, file_name): | |
| """Returns the size of the file, or nothing if the file doesn’t exist.""" | |
| return self.files.get(file_name, None) | |
| def FILE_COPY(self, source, dest): | |
| """Copy the source file to a new location. | |
| If the source file doesn’t exist, it throws a runtime exception. | |
| If the destination file already exists, it overwrites the existing file. | |
| """ | |
| if source not in self.files: | |
| raise RuntimeError(f"Source file {source} does not exist") | |
| self.files[dest] = self.files[source] | |
| def FILE_SEARCH(self, prefix): | |
| """Find top 10 files starting with the provided prefix. Order results by their size in descending order, and in case of a tie by file name.""" | |
| self.files |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass, field | |
| import json | |
| import math | |
| import string | |
| import re | |
| import random | |
| import sys | |
| import traceback | |
| import time | |
| from typing import Optional | |
| import functools | |
| from collections import OrderedDict | |
| # import numpy | |
| # import sortedcontainers | |
| def parse_timestamp(timestamp): | |
| from datetime import datetime | |
| if isinstance(timestamp, str): | |
| # try: # start simple for CodeSignal, but I wrote it out for practice | |
| # float(timestamp) | |
| # except ValueError: | |
| return datetime.fromisoformat(timestamp).timestamp() | |
| else: | |
| return timestamp | |
| def simulate_coding_framework(test_data): | |
| """ | |
| Simulates a coding framework operation on a list of lists of strings. | |
| Parameters: | |
| test_data (List[List[str]]): A list of lists containing strings. | |
| """ | |
| fs = FileSystem() | |
| results = [] | |
| for command in test_data: | |
| op = command[0] | |
| args = command[1:] # sometimes args2 is size and other times it's dest | |
| if op == "FILE_UPLOAD": | |
| file_name, size = args | |
| fs.FILE_UPLOAD(file_name, size) | |
| results.append(f"uploaded {file_name}") | |
| elif op == "FILE_GET": | |
| file_name = args[0] | |
| result = fs.FILE_GET(file_name) | |
| if result: | |
| results.append(f"got {file_name}") | |
| else: | |
| results.append("file not found") | |
| elif op == "FILE_COPY": | |
| source, dest = args | |
| fs.FILE_COPY(source, dest) | |
| results.append(f"copied {source} to {dest}") | |
| elif op == "FILE_SEARCH": | |
| prefix = args[0] | |
| result = fs.FILE_SEARCH(prefix) | |
| formatted = str(result).replace("'", "") | |
| results.append(f"found {formatted}") | |
| elif op == "FILE_UPLOAD_AT": | |
| timestamp, file_name, size, ttl = (args + [None])[:4] | |
| fs.FILE_UPLOAD_AT(parse_timestamp(timestamp), file_name, size, ttl) | |
| results.append(f"uploaded at {file_name}") | |
| elif op == "FILE_GET_AT": | |
| timestamp, file_name = args | |
| result = fs.FILE_GET_AT(parse_timestamp(timestamp), file_name) | |
| if result: | |
| results.append(f"got at {file_name}") | |
| else: | |
| results.append("file not found") | |
| elif op == "FILE_COPY_AT": | |
| timestamp, source, dest = args | |
| fs.FILE_COPY_AT(parse_timestamp(timestamp), source, dest) | |
| results.append(f"copied at {source} to {dest}") | |
| elif op == "FILE_SEARCH_AT": | |
| timestamp, prefix = args | |
| result = fs.FILE_SEARCH_AT(parse_timestamp(timestamp), prefix) | |
| formatted = str(result).replace("'", "") | |
| results.append(f"found at {formatted}") | |
| elif op == "ROLLBACK": | |
| timestamp = args[0] | |
| result = fs.ROLLBACK(parse_timestamp(timestamp)) | |
| # formatted = str(result).replace("'", "") | |
| results.append(f"rollback to {timestamp}") | |
| else: | |
| raise RuntimeError("unknown command") | |
| return results | |
| @dataclass | |
| class File: | |
| file_name: str | |
| size: int | |
| ttl: Optional[float] = None # no ttl means the file never expires | |
| # upload_time = field(default_factory=time.time) | |
| upload_time: Optional[float] = None | |
| def __post_init__(self): | |
| if not self.upload_time: | |
| self.upload_time = time.time() | |
| def is_expired(self, timestamp): | |
| # return False if not self.ttl else (time.time() > self.upload_time + self.ttl) | |
| if not self.ttl: | |
| return False | |
| else: | |
| return (timestamp > self.upload_time + self.ttl) or ( | |
| timestamp < self.upload_time | |
| ) | |
| class FileSystem: | |
| def __init__(self): | |
| self.files = {} | |
| def FILE_DELETE(self, file_name): | |
| self.files.pop(file_name, None) | |
| def FILE_RENAME(self, old_name, new_name): | |
| file = self.files.pop(old_name, None) | |
| if file: | |
| self.files[new_name] = file | |
| def FILE_EXISTS(self, file_name): | |
| return file_name in self.files | |
| def FILE_EXISTS_AT(self, timestamp, file_name): | |
| return file_name in self.files and not self.files[file_name].is_expired( | |
| timestamp | |
| ) | |
| def FILE_COUNT(self): | |
| return len(self.files) | |
| def FILE_LIST(self, sort_by="name"): | |
| if sort_by in ["name", "file_name"]: | |
| return sorted(self.files.keys()) | |
| elif sort_by == "size": | |
| return sorted(self.files.keys(), key=lambda x: self.files[x].size) | |
| else: # default | |
| return sorted(self.files.keys()) | |
| def FILE_UPLOAD(self, file_name, size): | |
| """- Upload the file to the remote storage server. | |
| - If a file with the same name already exists on the server, it throws a runtime exception. | |
| """ | |
| if file_name in self.files: | |
| raise RuntimeError("duplicate file") | |
| self.files[file_name] = File(file_name, size) | |
| # self.files[file_name] = size | |
| # print(self.files) | |
| def FILE_GET(self, file_name): | |
| """- Returns the size of the file, or nothing if the file doesn’t exist.""" | |
| file = self.files.get(file_name, None) | |
| return file.size if file else None | |
| def FILE_COPY(self, source, dest): | |
| """Copy the source file to a new location. | |
| - If the source file doesn’t exist, it throws a runtime exception. | |
| - If the destination file already exists, it overwrites the existing file. | |
| """ | |
| if source not in self.files: | |
| # print(self.files) | |
| raise RuntimeError("source doesn't exist") | |
| # if dest in self.files: | |
| # # print(self.files) | |
| # raise RuntimeError("dest already exists") | |
| self.files[dest] = self.files[source] | |
| def FILE_SEARCH(self, prefix): | |
| """- Find top 10 files starting with the provided prefix. Order results by their size in descending order, and in case of a tie by file name.""" | |
| # items = self.files.items() | |
| # sorted(items, key = lambda x: x[1]) | |
| # for file_name, size in : | |
| results = sorted(self.files.items(), key=lambda x: x[1].file_name) | |
| results = [ | |
| file_name | |
| for file_name, _ in sorted( | |
| results, key=lambda x: x[1].size, reverse=True | |
| ) | |
| if file_name.startswith(prefix) | |
| ] | |
| return results[:10] # if n_files else results | |
| def FILE_SEARCH_REGEX(self, pattern): | |
| import re | |
| results = sorted(self.files.items(), key=lambda x: x[1].file_name) | |
| results = [ | |
| file_name | |
| for file_name, _ in sorted( | |
| results, key=lambda x: x[1].size, reverse=True | |
| ) | |
| if re.search(pattern, file_name) | |
| ] | |
| return results[:10] # if n_files else results | |
| def FILE_GET_LARGE(self, n=10): | |
| return [ | |
| file_name | |
| for file_name, _ in sorted( | |
| self.files.items(), key=lambda x: x[1].size, reverse=True | |
| ) | |
| ][:n] | |
| def FILE_SEARCH_BY_SIZE(self, min_size, max_size): | |
| results = sorted(self.files.items(), key=lambda x: x[1].file_name) | |
| results = [ | |
| file_name | |
| for file_name, file in sorted( | |
| results, key=lambda x: x[1].size, reverse=True | |
| ) | |
| if file.size >= min_size and file.size <= max_size | |
| ] | |
| return results[:10] # if n_files else results | |
| def FILE_GET_TOTAL_SIZE(self): | |
| return sum(file.size for file in self.files.values()) | |
| def FILE_MOVE_AT(timestamp, source, dest): | |
| """- Move file (delete source after copy)""" | |
| if source not in self.files: | |
| # print(self.files) | |
| raise RuntimeError("source doesn't exist") | |
| # if file_to in self.files: # We'll chose to overwrite | |
| # # print(self.files) | |
| # raise RuntimeError("dest already exists") | |
| # self.files[dest] = self.files[source] | |
| if not self.files[source].is_expired(timestamp): | |
| self.files[dest] = self.files[source] | |
| self.FILE_DELETE(source) | |
| def FILE_REFRESH_AT(self, timestamp, file_name, new_ttl): | |
| """Update TTL of existing file""" | |
| if not self.FILE_EXISTS_AT(timestamp, file_name): | |
| raise RuntimeError(f"{file_name} does not exist") | |
| self.files[file_name].ttl = new_ttl | |
| def FILE_BATCH_UPLOAD_AT(self, timestamp, files_dict): | |
| """Upload multiple files at once""" | |
| for file_name, file in files_dict.items(): | |
| self.FILE_UPLOAD_AT(timestamp, file_name, file.size, file.ttl) | |
| def FILE_UPLOAD_AT(self, timestamp, file_name, file_size, ttl=None): | |
| """- The uploaded file is available for ttl seconds.""" | |
| # print(f'file_upload_at debug: timestamp: {timestamp}, ttl: {ttl}, file_name: {file_name}, file_size: {file_size}') | |
| if file_name in self.files: | |
| raise RuntimeError("duplicate file") | |
| self.files[file_name] = File(file_name, file_size, ttl, timestamp) | |
| # print(f'file_upload_at debug: timestamp: {timestamp}, ttl: {ttl}, file: {self.files.get(file_name, "")}') | |
| def FILE_GET_AT(self, timestamp, file_name): | |
| file = self.files.get(file_name, None) | |
| # print(f'file_get_at debug: timestamp: {timestamp}, file: {self.files.get(file_name, "")}') | |
| return file.size if file and not file.is_expired(timestamp) else None | |
| def FILE_COPY_AT(self, timestamp, file_from, file_to): | |
| if file_from not in self.files: | |
| # print(self.files) | |
| raise RuntimeError("source doesn't exist") | |
| if file_to in self.files: | |
| # print(self.files) | |
| raise RuntimeError("dest already exists") | |
| if not self.files[file_from].is_expired(timestamp): | |
| self.files[file_to] = self.files[file_from] | |
| def FILE_SEARCH_AT(self, timestamp, prefix): | |
| """- Results should only include files that are still “alive”.""" | |
| results = sorted(self.files.items(), key=lambda x: x[1].file_name) | |
| results = [ | |
| file_name | |
| for file_name, file in sorted( | |
| results, key=lambda x: x[1].size, reverse=True | |
| ) | |
| if file_name.startswith(prefix) and not file.is_expired(timestamp) | |
| ] | |
| return results[:10] # if n_files else results | |
| def FILE_SEARCH_EXPIRING_AT(self, timestamp, within_seconds): | |
| """Find files expiring soon | |
| Returns a list of tuples (file_name, size) sorted by size ascending, with file_name ascending to break ties | |
| """ | |
| # for file_name, file in self.files.items() | |
| results = sorted(self.files.items(), key=lambda x: x[1].file_name) | |
| sorted( | |
| results, | |
| key=lambda x: self.FILE_GET_EXPIRES_AT(timestamp, x[0]) | |
| - time.time(), | |
| ) | |
| return [ | |
| file_name | |
| for file_name, file in sorted( | |
| self.files.items(), | |
| key=lambda x: x[1]( | |
| self.FILE_GET_EXPIRES_AT(timestamp, x[0]) - time.time() | |
| ) | |
| <= within_seconds, | |
| ) | |
| ] | |
| def FILE_GET_EXPIRES_AT(self, timestamp, file_name): | |
| """Return when file will expire""" | |
| if self.FILE_EXISTS_AT(timestamp, file_name): | |
| return self.files[file_name].upload_time + self.files[file_name].ttl | |
| # Level 4 | |
| def ROLLBACK(self, timestamp): | |
| """- Rollback the state of the file storage to the state specified in the timestamp. | |
| - All ttls should be recalculated accordingly.""" | |
| # for file_name, file in self.files.items(): | |
| # if file.upload_time > parse_timestamp(timestamp): | |
| # del self.files[file_name] | |
| # pass | |
| # print(f'timestamp: {timestamp} and files before rollback {self.files}') | |
| self.files = { | |
| file_name: file | |
| for file_name, file in self.files.items() | |
| if file.upload_time <= timestamp | |
| } | |
| # print(f'timestamp: {timestamp} and files after rollback {self.files}') | |
| def SNAPSHOT(self, timestamp): | |
| """Save current state with timestamp as ID""" | |
| pass | |
| def RESTORE(self, snapshot_id): | |
| """Restore to a saved snapshot""" | |
| pass | |
| def FILE_HISTORY(self, file_name): | |
| """Show all operations on a file""" | |
| pass | |
| def COMPARE(self, timestamp1, timestamp2): | |
| """Show differences between two states""" | |
| pass | |
| def BEGIN_TRANSACTION(self): | |
| pass | |
| def COMMIT(self): | |
| pass | |
| def ROLLBACK_TRANSACTION(self): | |
| """Transaction support""" | |
| pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from copy import deepcopy | |
| @dataclass | |
| class FieldData: | |
| value: str | |
| timestamp: int | None = None | |
| ttl: int | None = None | |
| def __post_init__(self): | |
| if self.timestamp is not None: | |
| self.timestamp = int(self.timestamp) | |
| if self.ttl is not None: | |
| self.ttl = int(self.ttl) | |
| def is_expired(self, timestamp): | |
| # print(self) | |
| if self.timestamp is None or self.ttl is None: | |
| return False | |
| return int(timestamp) >= (self.timestamp + self.ttl) | |
| class MemoryDatabase: | |
| def __init__(self): | |
| self.database = defaultdict(dict) | |
| self.backup = {} | |
| # Level 1 | |
| def SET(self, key, field, value): | |
| self.database[key][field] = FieldData(value) | |
| return "" | |
| def GET(self, key, field): | |
| field_data = self.database[key].get(field, None) | |
| return field_data.value if field_data else "" | |
| def DELETE(self, key, field): | |
| return ( | |
| "true" | |
| if self.database[key].pop(field, None) is not None | |
| else "false" | |
| ) | |
| # Level 2 | |
| def SCAN(self, key): | |
| return ", ".join( | |
| [ | |
| f"{field}({self.database[key][field].value})" | |
| for field in sorted(self.database[key]) | |
| ] | |
| ) | |
| def SCAN_BY_PREFIX(self, key, prefix): | |
| return ", ".join( | |
| [ | |
| f"{field}({self.database[key][field].value})" | |
| for field in sorted(self.database[key]) | |
| if field.startswith(prefix) | |
| ] | |
| ) | |
| # Level 3 | |
| def SET_AT(self, key, field, value, timestamp): | |
| self.database[key][field] = FieldData(value, timestamp) | |
| return "" | |
| def SET_AT_WITH_TTL(self, key, field, value, timestamp, ttl): | |
| self.database[key][field] = FieldData(value, timestamp, ttl) | |
| return "" | |
| def DELETE_AT(self, key, field, timestamp): | |
| # print(key, field, timestamp) | |
| field_data = self.database[key].pop(field, None) | |
| # print(field_data) | |
| return ( | |
| "true" | |
| if field_data is not None and not field_data.is_expired(timestamp) | |
| else "false" | |
| ) | |
| def GET_AT(self, key, field, timestamp): | |
| field_data = self.database[key].get(field, None) | |
| return ( | |
| field_data.value | |
| if field_data is not None and not field_data.is_expired(timestamp) | |
| else "" | |
| ) | |
| def SCAN_AT(self, key, timestamp): | |
| return ", ".join( | |
| [ | |
| f"{field}({self.database[key][field].value})" | |
| for field in sorted(self.database[key]) | |
| if not self.database[key][field].is_expired(timestamp) | |
| ] | |
| ) | |
| def SCAN_BY_PREFIX_AT(self, key, prefix, timestamp): | |
| # print(key, prefix, timestamp) | |
| # print(self.database[key]) | |
| # for field, field_data in self.database[key].items(): | |
| # print(field, field_data.is_expired(timestamp)) | |
| return ", ".join( | |
| [ | |
| f"{field}({self.database[key][field].value})" | |
| for field in sorted(self.database[key]) | |
| if field.startswith(prefix) | |
| and not self.database[key][field].is_expired(timestamp) | |
| ] | |
| ) | |
| # Level 4 | |
| def BACKUP(self, timestamp): | |
| restored = defaultdict(dict) | |
| for key in self.database: | |
| for field in self.database[key]: | |
| field_data = self.database[key][field] | |
| if not field_data.is_expired(timestamp): | |
| restored[key][field] = deepcopy(field_data) | |
| if ( | |
| restored[key][field].timestamp is not None | |
| and restored[key][field].ttl is not None | |
| ): | |
| restored[key][field].ttl -= ( | |
| int(timestamp) - restored[key][field].timestamp | |
| ) | |
| self.backup[int(timestamp)] = restored | |
| return str(len(restored)) | |
| def RESTORE(self, timestamp, timestamp_to_restore): | |
| backup_time = max( | |
| key | |
| for key in self.backup.keys() | |
| if key <= int(timestamp_to_restore) | |
| ) | |
| restored = defaultdict(dict) | |
| for key in self.backup[backup_time]: | |
| for field in self.backup[backup_time][key]: | |
| field_data = self.backup[backup_time][key][field] | |
| restored[key][field] = deepcopy(field_data) | |
| if ( | |
| restored[key][field].timestamp is not None | |
| and restored[key][field].ttl is not None | |
| ): | |
| restored[key][field].timestamp = int(timestamp) | |
| self.database = restored | |
| return "" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Tier 0: ESSENTIAL (Must know cold) | |
| # Basic operations | |
| sorted(items) # Ascending by default | |
| sorted(items, reverse=True) # Descending | |
| dict.get(key, default_value) # Safe key access | |
| 'separator'.join(list_of_strings) # Join strings | |
| list[:] # Shallow copy | |
| len(), range(), enumerate() | |
| # String methods | |
| str.startswith(prefix) | |
| str.endswith(suffix) | |
| str.split(delimiter) | |
| str.strip() # Remove whitespace | |
| # List methods | |
| list.append(item) | |
| list.pop() # Remove and return last | |
| list.pop(0) # Remove and return first | |
| # Tier 1: HIGHLY LIKELY (The email specifically mentions these) | |
| # # collections | |
| from collections import defaultdict # dict with default values | |
| from collections import Counter # count occurrences | |
| from collections import deque # efficient pop/append both ends | |
| # copy | |
| from copy import deepcopy # Deep copy nested structures | |
| from copy import copy # Shallow copy | |
| # dataclasses | |
| from dataclasses import dataclass | |
| @dataclass | |
| class File: | |
| name: str | |
| size: int | |
| timestamp: float = 0.0 | |
| # Tier 2: VERY USEFUL (Common patterns) | |
| # Dict operations | |
| dict.keys(), dict.values(), dict.items() | |
| dict.update(other_dict) | |
| dict.setdefault(key, default) | |
| # List comprehensions | |
| [x for x in items if condition] | |
| {k: v for k, v in items} # Dict comprehension | |
| # Set operations (for uniqueness) | |
| set() # Create set | |
| set.add(), set.remove() | |
| set.intersection(), set.union() | |
| # Time operations | |
| import time | |
| time.time() # Current timestamp | |
| # Tier 3: GOOD TO KNOW (Might save time) | |
| # bisect (sorted list operations) | |
| import bisect | |
| bisect.insort(sorted_list, item) # Insert maintaining sort | |
| # heapq (priority queue) | |
| import heapq | |
| heapq.heappush(heap, item) | |
| heapq.heappop(heap) | |
| # itertools (probably overkill) | |
| from itertools import chain, groupby | |
| # datetime (if timestamps are complex) | |
| from datetime import datetime | |
| datetime.fromisoformat("2021-07-01T12:00:00") | |
| # Tier 4: PROBABLY UNNECESSARY (But complete list) | |
| import re | |
| # json, re, math, os, sys - unlikely needed | |
| # functools, operator - too advanced for this test | |
| # typing - they said skip type hints for speed | |
| re.search(pattern, string) - # Find pattern anywhere in string (returns Match object or None) | |
| re.match(pattern, string) - # Match pattern at START of string only | |
| re.findall(pattern, string) - # Return list of all matches | |
| re.compile(pattern) - # Pre-compile a pattern for reuse |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass, field | |
| @dataclass | |
| class Position: | |
| position: str | |
| compensation: int | |
| start_timestamp: int = 0 | |
| def __post_init__(self): | |
| self.compensation = int(self.compensation) | |
| self.start_timestamp = int(self.start_timestamp) | |
| @dataclass | |
| class Worker: | |
| worker_id: str | |
| position: str | |
| compensation: int | |
| times: list[int] = field(default_factory=list) | |
| # positions: dict[str, Position] = field(init=False) | |
| pay_rate: dict[(int, int), int] = field(default_factory=dict) | |
| def __post_init__(self): | |
| self.compensation = int(self.compensation) | |
| self.positions: dict[str, Position] = { | |
| self.position: Position(self.position, self.compensation) | |
| } | |
| class WorkHours: | |
| def __init__(self): | |
| self.workers = {} | |
| # Level 1 | |
| def ADD_WORKER(self, worker_id, position, compensation): | |
| """ | |
| should add the workerId to the system and save additional information about them: their position and compensation. If the workerId already exists, nothing happens and this operation should return "false". If the workerId was successfully added, return "true". workerId and position are guaranteed to contain only English letters and spaces. | |
| """ | |
| if worker_id in self.workers: | |
| return "false" | |
| else: | |
| self.workers[worker_id] = Worker(worker_id, position, compensation) | |
| return "true" | |
| def REGISTER(self, worker_id, timestamp): | |
| if worker_id not in self.workers: | |
| return "invalid_request" | |
| else: | |
| self.workers[worker_id].times.append(int(timestamp)) | |
| return "registered" | |
| def GET(self, worker_id): | |
| """ | |
| should return a string representing the total calculated amount of time that the workerId spent in the office. The amount of time is calculated using finished working sessions only. It means that if the worker has entered the office but hasn't left yet, this visit is not considered in the calculation. If the workerId doesn't exist within the system, return an empty string. | |
| """ | |
| if worker_id not in self.workers: | |
| return "" | |
| else: | |
| durations = [ | |
| leave - enter | |
| for enter, leave in zip( | |
| self.workers[worker_id].times[::2], | |
| self.workers[worker_id].times[1::2], | |
| ) | |
| ] | |
| return str(sum(durations)) | |
| # Level 2 | |
| def TOP_N_WORKERS(self, n, position): | |
| n = int(n) | |
| def sum_intervals(worker_id): | |
| return sum( | |
| (leave - enter) | |
| for enter, leave in zip( | |
| self.workers[worker_id].times[::2], | |
| self.workers[worker_id].times[1::2], | |
| ) | |
| if self.compensation_period(worker_id, enter).position | |
| == position | |
| ) | |
| # candidates = [(worker_id, int(self.GET(worker_id))) for worker_id, worker in self.workers.items() if worker.position == position] | |
| candidates = [ | |
| (worker_id, sum_intervals(worker_id)) | |
| for worker_id, worker in self.workers.items() | |
| if worker.position == position | |
| ] | |
| if not candidates: | |
| return "" | |
| candidates.sort(key=lambda x: (-x[1], x[0])) | |
| return ", ".join(f"{wid}({time})" for wid, time in candidates[:n]) | |
| # Level 3 | |
| def PROMOTE( | |
| self, worker_id, new_position, new_compensation, start_timestamp | |
| ): | |
| if worker_id not in self.workers: | |
| return "invalid _request" | |
| if new_position in self.workers[worker_id].positions: | |
| return "invalid_request" | |
| self.workers[worker_id].positions[new_position] = Position( | |
| new_position, new_compensation, start_timestamp | |
| ) | |
| self.workers[worker_id].position = new_position | |
| self.workers[worker_id].compensation = new_compensation | |
| # print(f'positions: {self.workers[worker_id]}') | |
| return "success" | |
| def compensation_period(self, worker_id, interval_begin): | |
| return max( | |
| ( | |
| position | |
| for position in self.workers[worker_id].positions.values() | |
| if position.start_timestamp <= interval_begin | |
| ), | |
| key=lambda p: p.start_timestamp, | |
| ) | |
| # return current_position | |
| # return self.workers[worker_id].positions[period_position].compensation | |
| def CALC_SALARY(self, worker_id, start_timestamp, end_timestamp): | |
| if worker_id not in self.workers: | |
| return "" | |
| return str( | |
| sum( | |
| ( | |
| min(leave, int(end_timestamp)) | |
| - max(enter, int(start_timestamp)) | |
| ) | |
| * self.workers[worker_id].pay_rate.get((enter, leave), 1) | |
| * self.compensation_period(worker_id, enter).compensation | |
| for enter, leave in zip( | |
| self.workers[worker_id].times[::2], | |
| self.workers[worker_id].times[1::2], | |
| ) | |
| if leave >= int(start_timestamp) and enter <= int(end_timestamp) | |
| ) | |
| ) | |
| # Level 4 | |
| # The working hours register program should support setting time periods to be double-paid. | |
| def DOUBLE_PAY(self, worker_id, interval_begin, interval_end): | |
| multiplier = 2 | |
| if worker_id not in self.workers: | |
| return "" | |
| # check if valid interval | |
| # if (interval_begin, interval_end) not in zip(self.workers[worker_id].times[::2], self.workers[worker_id].times[1::2]): | |
| # return '' | |
| self.workers[worker_id].pay_rate[ | |
| (interval_begin, interval_end) | |
| ] = multiplier |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment