Skip to content

Instantly share code, notes, and snippets.

@Steboss
Created January 16, 2026 10:13
Show Gist options
  • Select an option

  • Save Steboss/a861d3f3916783956f79c2b89e9e147e to your computer and use it in GitHub Desktop.

Select an option

Save Steboss/a861d3f3916783956f79c2b89e9e147e to your computer and use it in GitHub Desktop.
Base Autogen's class for agents workflows
import logging
import json
from typing import List
from autogen_core import (
DefaultTopicId,
MessageContext,
RoutedAgent,
message_handler,
)
from autogen_core.models import (
AssistantMessage,
ChatCompletionClient,
LLMMessage,
SystemMessage,
UserMessage,
FunctionExecutionResult
)
from autogen_core.tools import FunctionTool
from pydantic import BaseModel
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logging.getLogger("autogen_core").setLevel(logging.ERROR)
logging.getLogger("autogen_agentchat").setLevel(logging.ERROR)
#logging.getLogger("autogen_core.events").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
logger.info("Initialising the model...")
console = Console()
# --- Message protocol ---
class GroupChatMessage(BaseModel):
body: UserMessage
class RequestToSpeak(BaseModel):
pass
# ---------------------------
class BaseMatMulAgent(RoutedAgent):
"""
A generic agent that can handle RequestToSpeak, call tools, and publish messages.
"""
def __init__(
self,
description: str,
group_chat_topic_type: str,
model_client: ChatCompletionClient,
system_message: str,
tools: List[FunctionTool] = []
) -> None:
super().__init__(description=description)
self._group_chat_topic_type = group_chat_topic_type
self._model_client = model_client
self._system_message = SystemMessage(content=system_message)
self._chat_history: List[LLMMessage] = []
self._tools = tools
@message_handler
async def handle_message(self, message: GroupChatMessage, ctx: MessageContext) -> None:
# Ignore our own echoed messages.
# We already added our output to history as an 'AssistantMessage' when we generated it.
# If we add it again as a 'UserMessage' (which is how group messages arrive),
# it confuses the model.
if message.body.source == self.id.type: return
new_msg = message.body
# Merge consecutive UserMessages.
# LlamaCpp throws an error if history is [User, User].
# We must consolidate them into [User (merged)].
if self._chat_history and isinstance(self._chat_history[-1], UserMessage):
last_msg = self._chat_history[-1]
# We append the new content to the previous message with a clear separator
# This keeps the history strict: System -> User -> Assistant...
last_msg.content = f"{last_msg.content}\n\n--- Message from {new_msg.source} ---\n{new_msg.content}"
else:
# Otherwise, append normally
self._chat_history.append(new_msg)
@message_handler
async def handle_request_to_speak(self, message: RequestToSpeak, ctx: MessageContext) -> None:
console.print(f"\n[bold yellow]➤ {self.id.type} is thinking...[/bold yellow]")
# Call the LLM (with tools if available)
completion = await self._model_client.create(
[self._system_message] + self._chat_history,
tools=self._tools,
cancellation_token=ctx.cancellation_token,
)
# Handle Tool Calls (if any)
response_content = completion.content
if isinstance(response_content, list) and response_content and hasattr(response_content[0], 'name'):
console.print(f"[cyan] {self.id.type} calls tool: {response_content[0].name}[/cyan]")
results = []
for tool_call in response_content:
tool = next((t for t in self._tools if t.name == tool_call.name), None)
if tool:
args = json.loads(tool_call.arguments)
console.print(f"[yellow] Tool args: {args}[/yellow]")
result_str = await tool.run_json(args, ctx.cancellation_token)
console.print(f"[green] Tool args: {result_str[:200]}[/green]")
console.print(Panel(f"{str(result_str)[:300]}...", title=f"Tool result: {tool.name}", border_style="green"))
results.append(FunctionExecutionResult(content=result_str, call_id=tool_call.id))
console.print(f"[dim] -> Result: {str(result_str)[:100]}...[/dim]")
self._chat_history.append(AssistantMessage(content=response_content, source=self.id.type))
self._chat_history.extend(results)
completion = await self._model_client.create(
[self._system_message] + self._chat_history,
tools=self._tools,
cancellation_token=ctx.cancellation_token,
)
response_content = completion.content
# Publish the final response
if isinstance(response_content, str):
console.print(Markdown(f"**{self.id.type}**: {response_content}"))
console.print(Panel(response_content, title=f" {self.id.type} Output", border_style="cyan"))
# Add to own history as AssistantMessage (Strict alternation)
self._chat_history.append(AssistantMessage(content=response_content, source=self.id.type))
# Broadcast to group
await self.publish_message(
GroupChatMessage(body=UserMessage(content=response_content, source=self.id.type)),
topic_id=DefaultTopicId(type=self._group_chat_topic_type),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment