Created
January 16, 2026 10:13
-
-
Save Steboss/a861d3f3916783956f79c2b89e9e147e to your computer and use it in GitHub Desktop.
Base Autogen's class for agents workflows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import json | |
| from typing import List | |
| from autogen_core import ( | |
| DefaultTopicId, | |
| MessageContext, | |
| RoutedAgent, | |
| message_handler, | |
| ) | |
| from autogen_core.models import ( | |
| AssistantMessage, | |
| ChatCompletionClient, | |
| LLMMessage, | |
| SystemMessage, | |
| UserMessage, | |
| FunctionExecutionResult | |
| ) | |
| from autogen_core.tools import FunctionTool | |
| from pydantic import BaseModel | |
| from rich.console import Console | |
| from rich.markdown import Markdown | |
| from rich.panel import Panel | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logging.getLogger("autogen_core").setLevel(logging.ERROR) | |
| logging.getLogger("autogen_agentchat").setLevel(logging.ERROR) | |
| #logging.getLogger("autogen_core.events").setLevel(logging.ERROR) | |
| logger = logging.getLogger(__name__) | |
| logger.info("Initialising the model...") | |
| console = Console() | |
| # --- Message protocol --- | |
| class GroupChatMessage(BaseModel): | |
| body: UserMessage | |
| class RequestToSpeak(BaseModel): | |
| pass | |
| # --------------------------- | |
| class BaseMatMulAgent(RoutedAgent): | |
| """ | |
| A generic agent that can handle RequestToSpeak, call tools, and publish messages. | |
| """ | |
| def __init__( | |
| self, | |
| description: str, | |
| group_chat_topic_type: str, | |
| model_client: ChatCompletionClient, | |
| system_message: str, | |
| tools: List[FunctionTool] = [] | |
| ) -> None: | |
| super().__init__(description=description) | |
| self._group_chat_topic_type = group_chat_topic_type | |
| self._model_client = model_client | |
| self._system_message = SystemMessage(content=system_message) | |
| self._chat_history: List[LLMMessage] = [] | |
| self._tools = tools | |
| @message_handler | |
| async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None: | |
| # Ignore our own echoed messages. | |
| # We already added our output to history as an 'AssistantMessage' when we generated it. | |
| # If we add it again as a 'UserMessage' (which is how group messages arrive), | |
| # it confuses the model. | |
| if message.body.source == self.id.type: return | |
| new_msg = message.body | |
| # Merge consecutive UserMessages. | |
| # LlamaCpp throws an error if history is [User, User]. | |
| # We must consolidate them into [User (merged)]. | |
| if self._chat_history and isinstance(self._chat_history[-1], UserMessage): | |
| last_msg = self._chat_history[-1] | |
| # We append the new content to the previous message with a clear separator | |
| # This keeps the history strict: System -> User -> Assistant... | |
| last_msg.content = f"{last_msg.content}\n\n--- Message from {new_msg.source} ---\n{new_msg.content}" | |
| else: | |
| # Otherwise, append normally | |
| self._chat_history.append(new_msg) | |
| @message_handler | |
| async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None: | |
| console.print(f"\n[bold yellow]➤ {self.id.type} is thinking...[/bold yellow]") | |
| # Call the LLM (with tools if available) | |
| completion = await self._model_client.create( | |
| [self._system_message] + self._chat_history, | |
| tools=self._tools, | |
| cancellation_token=ctx.cancellation_token, | |
| ) | |
| # Handle Tool Calls (if any) | |
| response_content = completion.content | |
| if isinstance(response_content, list) and response_content and hasattr(response_content[0], 'name'): | |
| console.print(f"[cyan] {self.id.type} calls tool: {response_content[0].name}[/cyan]") | |
| results = [] | |
| for tool_call in response_content: | |
| tool = next((t for t in self._tools if t.name == tool_call.name), None) | |
| if tool: | |
| args = json.loads(tool_call.arguments) | |
| console.print(f"[yellow] Tool args: {args}[/yellow]") | |
| result_str = await tool.run_json(args, ctx.cancellation_token) | |
| console.print(f"[green] Tool args: {result_str[:200]}[/green]") | |
| console.print(Panel(f"{str(result_str)[:300]}...", title=f"Tool result: {tool.name}", border_style="green")) | |
| results.append(FunctionExecutionResult(content=result_str, call_id=tool_call.id)) | |
| console.print(f"[dim] -> Result: {str(result_str)[:100]}...[/dim]") | |
| self._chat_history.append(AssistantMessage(content=response_content, source=self.id.type)) | |
| self._chat_history.extend(results) | |
| completion = await self._model_client.create( | |
| [self._system_message] + self._chat_history, | |
| tools=self._tools, | |
| cancellation_token=ctx.cancellation_token, | |
| ) | |
| response_content = completion.content | |
| # Publish the final response | |
| if isinstance(response_content, str): | |
| console.print(Markdown(f"**{self.id.type}**: {response_content}")) | |
| console.print(Panel(response_content, title=f" {self.id.type} Output", border_style="cyan")) | |
| # Add to own history as AssistantMessage (Strict alternation) | |
| self._chat_history.append(AssistantMessage(content=response_content, source=self.id.type)) | |
| # Broadcast to group | |
| await self.publish_message( | |
| GroupChatMessage(body=UserMessage(content=response_content, source=self.id.type)), | |
| topic_id=DefaultTopicId(type=self._group_chat_topic_type), | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment