Created
March 3, 2025 17:21
-
-
Save dtornow/8162c311c7a61e762dedffef07f9d666 to your computer and use it in GitHub Desktop.
Minimal Distributed Systems Simulator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import List, Any, Callable | |
| class FilterableList(List[Any]): | |
| def remove(self, predicate: Callable[[Any], bool]) -> List[Any]: | |
| removed = [item for item in self if predicate(item)] | |
| self[:] = [item for item in self if not predicate(item)] | |
| return removed | |
| class Message: | |
| def __init__(self, send: str, recv: str, data: Any, time: int): | |
| self.send = send | |
| self.recv = recv | |
| self.data = data | |
| self.time = time | |
| def __repr__(self): | |
| return f"Message(from={self.send}, to={self.recv}, payload={self.data}, time={self.time})" | |
| class Component: | |
| def __init__(self, uni: str, any: str): | |
| self.uni = uni | |
| self.any = any | |
| self.time = 0 | |
| self.outgoing = [] | |
| def step(self, time: int, messages: List[Message]) -> List[Message]: | |
| return [] | |
| def __repr__(self): | |
| return f"Component(uni={self.uni}, any={self.any})" | |
| class Simulator: | |
| def __init__(self): | |
| self.time = 0 | |
| self.components: List[Component] = [] | |
| self.buffer: List[Message] = FilterableList() | |
| def add_component(self, component: Component): | |
| self.components.append(component) | |
| def step(self): | |
| self.time += 1 | |
| print(f"\n=== Step {self.time} ===") | |
| recv = [] | |
| for msg in self.buffer.remove(lambda msg: msg.recv == "E"): | |
| print("Message to environment:", msg) | |
| for component in self.components: | |
| # Filter messages for this component | |
| send = self.buffer.remove(lambda msg: | |
| msg.recv == component.uni or msg.recv == component.any | |
| ) | |
| for msg in send: | |
| print("Message to component:", msg) | |
| # Let the component process the messages | |
| recv.extend(component.step(self.time, send)) | |
| # Add outgoing messages to the buffer | |
| self.buffer.extend(recv) | |
| def run(self, steps: int): | |
| for _ in range(steps): | |
| self.step() | |
| # Example usage: | |
| class EchoComponent(Component): | |
| def step(self, time: int, messages: List[Message]): | |
| return [Message(self.uni, msg.send, f"Echo: {msg.data}", time) for msg in messages] | |
| sim = Simulator() | |
| c1 = EchoComponent("A", "default") | |
| c2 = EchoComponent("B", "default") | |
| sim.add_component(c1) | |
| sim.add_component(c2) | |
| sim.buffer.append(Message("E", "A", "Hello", 0)) | |
| sim.run(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment