Created
January 16, 2026 10:25
-
-
Save Steboss/6046a71f9db23ee7260b21583fe352db to your computer and use it in GitHub Desktop.
Matrix Multiplication Autogen's manager to orchestrate all the agents
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import re | |
| from typing import List | |
| from autogen_core import ( | |
| DefaultTopicId, | |
| MessageContext, | |
| RoutedAgent, | |
| message_handler, | |
| ) | |
| from autogen_core.models import ( | |
| UserMessage, | |
| ) | |
| from rich.console import Console | |
| from rag_processing.rag_agent_tools import add_to_rag_memory | |
| from config import WORKSPACE_DIR | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logging.getLogger("autogen_core").setLevel(logging.ERROR) | |
| logging.getLogger("autogen_agentchat").setLevel(logging.ERROR) | |
| logger = logging.getLogger(__name__) | |
| logger.info("Initialising the model...") | |
| console = Console() | |
| class MatMulManager(RoutedAgent): | |
| """ | |
| Manages the workflow: Proposer -> Verifier -> Coder -> Tester -> Proposer | |
| """ | |
| def __init__(self, participant_topics: List[str], group_topic_type: str) -> None: | |
| super().__init__("Manager") | |
| self._participants = participant_topics | |
| self._group_topic_type = group_topic_type | |
| self._max_rounds = 100 | |
| self._current_algorithm_content = "" | |
| state = load_manager_state() | |
| self._round = state.get("round", 0) | |
| console.print(f"[bold magenta]Manager initialized at rount {self._round}[/bold magenta]") | |
| @message_handler | |
| async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None: | |
| sender = message.body.source | |
| content = str(message.body.content) | |
| # Ignore our own messages to prevent infinite loops | |
| if sender == "Manager": return | |
| console.print(f"[bold magenta]Manager received message from {sender}[/bold magenta]") | |
| next_speaker = None | |
| # --- THE BRAIN (State Machine) --- | |
| # --- Capture Algorithm from Proposer/Verifier --- | |
| # When Verifier quotes the algo in verification, or when Proposer sends it. | |
| # It's safest to capture it when Verification PASSES. | |
| if sender == "Verifier" and "[Verification: PASSED]" in content: | |
| # For the moment pass the verifier | |
| pass | |
| if sender == "User": | |
| next_speaker = "Proposer" | |
| elif sender == "Proposer": | |
| # Capture the algorithm text here! | |
| if "[ALGORITHM]" in content: | |
| self._current_algorithm_content = content | |
| save_manager_state(self._round, self._current_algorithm_content) | |
| next_speaker = "Verifier" | |
| elif sender == "Verifier": | |
| if "[Verification: PASSED]" in content: | |
| console.print("[bold green] Verification Passed! Moving to Code.[/bold green]") | |
| instruction = ( | |
| f"Verification passed. Coder, proceed.\n" | |
| f"IMPORTANT: Save the files using this prefix: 'iter_{self._round}_'." | |
| ) | |
| await self.publish_message( | |
| GroupChatMessage(body=UserMessage(content=instruction, source="Manager")), | |
| topic_id=DefaultTopicId(type=self._group_topic_type) | |
| ) | |
| next_speaker = "Coder" | |
| else: | |
| console.print("[bold red] Verification Failed! Back to Proposer.[/bold red]") | |
| next_speaker = "Proposer" | |
| elif sender == "Coder": | |
| # save the code, at the moment Rust | |
| code_match = re.search(r'```(?:rust)?\n(.*?)```', content, re.DOTALL) | |
| if code_match: | |
| rust_code = code_match.group(1).strip() | |
| filename = f"iter_{self._round}_algo.rs" | |
| filepath = WORKSPACE_DIR / filename | |
| filepath.write_text(rust_code) | |
| console.print(f"[green]✅ Manager saved code to {filepath}[/green]") | |
| # Notify Tester | |
| await self.publish_message( | |
| GroupChatMessage(body=UserMessage( | |
| content=f"Code saved to {filename}. Proceed with testing.", | |
| source="Manager" | |
| )), | |
| topic_id=DefaultTopicId(type=self._group_topic_type) | |
| ) | |
| next_speaker = "Tester" | |
| else: | |
| console.print("[bold red]⚠️ Manager could not find code block in Coder response![/bold red]") | |
| # Send it back to Coder to fix formatting | |
| await self.publish_message( | |
| GroupChatMessage(body=UserMessage( | |
| content="ERROR: No markdown code block found. Please output the code inside ```rust ... ``` blocks.", | |
| source="Manager" | |
| )), | |
| topic_id=DefaultTopicId(type=self._group_topic_type) | |
| ) | |
| next_speaker = "Coder" | |
| elif sender == "Tester": | |
| if "PERFORMANCE_REPORT" in content or "Benchmark report" in content or "Gige multiply-adds" in content: | |
| console.print("[bold green]Manager: received benchmark report.[/bold green]") | |
| full_memory_entry = ( | |
| f"### ITERATION {self._round} ARCHIVE\n" | |
| f"1. ALGORITHM PROPOSED:\n{self._current_algorithm_content}\n\n" | |
| f"2. PERFORMANCE RESULT:\n{content}" | |
| ) | |
| await add_to_rag_memory( | |
| full_memory_entry, | |
| source_label=f"iteration_{self._round}_complete" | |
| ) | |
| next_round = self._round + 1 | |
| save_manager_state(next_round, "") | |
| console.print(f"[bold blue]💾 Manager: saved iteration {self._round}") | |
| # stop | |
| return | |
| # Trigger the next agent | |
| if next_speaker: | |
| await self.publish_message( | |
| RequestToSpeak(), | |
| DefaultTopicId(type=next_speaker) | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment