Skip to content

Instantly share code, notes, and snippets.

@Steboss
Created January 16, 2026 10:25
Show Gist options
  • Select an option

  • Save Steboss/6046a71f9db23ee7260b21583fe352db to your computer and use it in GitHub Desktop.

Select an option

Save Steboss/6046a71f9db23ee7260b21583fe352db to your computer and use it in GitHub Desktop.
Matrix Multiplication Autogen's manager to orchestrate all the agents
import logging
import re
from typing import List
from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
message_handler,
)
from autogen_core.models import (
UserMessage,
)
from rich.console import Console
from rag_processing.rag_agent_tools import add_to_rag_memory
from config import WORKSPACE_DIR
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logging.getLogger("autogen_core").setLevel(logging.ERROR)
logging.getLogger("autogen_agentchat").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
logger.info("Initialising the model...")
console = Console()
class MatMulManager(RoutedAgent):
"""
Manages the workflow: Proposer -> Verifier -> Coder -> Tester -> Proposer
"""
def __init__(self, participant_topics: List[str], group_topic_type: str) -> None:
super().__init__("Manager")
self._participants = participant_topics
self._group_topic_type = group_topic_type
self._max_rounds = 100
self._current_algorithm_content = ""
state = load_manager_state()
self._round = state.get("round", 0)
console.print(f"[bold magenta]Manager initialized at rount {self._round}[/bold magenta]")
@message_handler
async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
sender = message.body.source
content = str(message.body.content)
# Ignore our own messages to prevent infinite loops
if sender == "Manager": return
console.print(f"[bold magenta]Manager received message from {sender}[/bold magenta]")
next_speaker = None
# --- THE BRAIN (State Machine) ---
# --- Capture Algorithm from Proposer/Verifier ---
# When Verifier quotes the algo in verification, or when Proposer sends it.
# It's safest to capture it when Verification PASSES.
if sender == "Verifier" and "[Verification: PASSED]" in content:
# For the moment pass the verifier
pass
if sender == "User":
next_speaker = "Proposer"
elif sender == "Proposer":
# Capture the algorithm text here!
if "[ALGORITHM]" in content:
self._current_algorithm_content = content
save_manager_state(self._round, self._current_algorithm_content)
next_speaker = "Verifier"
elif sender == "Verifier":
if "[Verification: PASSED]" in content:
console.print("[bold green] Verification Passed! Moving to Code.[/bold green]")
instruction = (
f"Verification passed. Coder, proceed.\n"
f"IMPORTANT: Save the files using this prefix: 'iter_{self._round}_'."
)
await self.publish_message(
GroupChatMessage(body=UserMessage(content=instruction, source="Manager")),
topic_id=DefaultTopicId(type=self._group_topic_type)
)
next_speaker = "Coder"
else:
console.print("[bold red] Verification Failed! Back to Proposer.[/bold red]")
next_speaker = "Proposer"
elif sender == "Coder":
# save the code, at the moment Rust
code_match = re.search(r'```(?:rust)?\n(.*?)```', content, re.DOTALL)
if code_match:
rust_code = code_match.group(1).strip()
filename = f"iter_{self._round}_algo.rs"
filepath = WORKSPACE_DIR / filename
filepath.write_text(rust_code)
console.print(f"[green]✅ Manager saved code to {filepath}[/green]")
# Notify Tester
await self.publish_message(
GroupChatMessage(body=UserMessage(
content=f"Code saved to {filename}. Proceed with testing.",
source="Manager"
)),
topic_id=DefaultTopicId(type=self._group_topic_type)
)
next_speaker = "Tester"
else:
console.print("[bold red]⚠️ Manager could not find code block in Coder response![/bold red]")
# Send it back to Coder to fix formatting
await self.publish_message(
GroupChatMessage(body=UserMessage(
content="ERROR: No markdown code block found. Please output the code inside ```rust ... ``` blocks.",
source="Manager"
)),
topic_id=DefaultTopicId(type=self._group_topic_type)
)
next_speaker = "Coder"
elif sender == "Tester":
if "PERFORMANCE_REPORT" in content or "Benchmark report" in content or "Gige multiply-adds" in content:
console.print("[bold green]Manager: received benchmark report.[/bold green]")
full_memory_entry = (
f"### ITERATION {self._round} ARCHIVE\n"
f"1. ALGORITHM PROPOSED:\n{self._current_algorithm_content}\n\n"
f"2. PERFORMANCE RESULT:\n{content}"
)
await add_to_rag_memory(
full_memory_entry,
source_label=f"iteration_{self._round}_complete"
)
next_round = self._round + 1
save_manager_state(next_round, "")
console.print(f"[bold blue]💾 Manager: saved iteration {self._round}")
# stop
return
# Trigger the next agent
if next_speaker:
await self.publish_message(
RequestToSpeak(),
DefaultTopicId(type=next_speaker)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment