Skip to content

Instantly share code, notes, and snippets.

@pavlovmilen
Created July 15, 2025 18:26
Show Gist options
  • Select an option

  • Save pavlovmilen/757729e42c947c0b133a9aa4159badc5 to your computer and use it in GitHub Desktop.

Select an option

Save pavlovmilen/757729e42c947c0b133a9aa4159badc5 to your computer and use it in GitHub Desktop.
agents_with_mcp
# model client
def get_fresh_model_client():
"""Create a fresh Azure OpenAI client"""
return AzureOpenAIChatCompletionClient(
model=AZURE_OPENAI_MODEL,
api_version=AZURE_OPENAI_API_VERSION,
azure_deployment=AZURE_OPENAI_DEPLOYMENT,
azure_endpoint=AZURE_OPENAI_ENDPOINT_SWEDEN,
api_key=AZURE_OPENAI_API_KEY_SWEDEN,
timeout=180.0,
max_retries=2
)
# Image upload tool and agent
from autogen_agentchat.agents import AssistantAgent
from tools.file_upload_service import upload_automation_image
# Create async wrapper for upload function with base64 conversion
import os, base64, aiofiles
from pathlib import Path
from typing import Union, Dict, Any, List
from autogen_core.tools import FunctionTool
# existing upload helper you already have
from tools.file_upload_service import upload_automation_image
async def upload_automation_image_async(
payload: Union[str, Dict[str, Any], List[Any]],
domain_name: str,
) -> str:
"""
payload : absolute file path, {"type":"file","path": ...} dict, or base-64 string
domain_name: example.com
"""
# normalise input -------------------------------------------------
if isinstance(payload, list):
payload = next((item for item in payload if isinstance(item, (str, dict))), payload[0])
if isinstance(payload, dict) and payload.get("type") == "file":
payload = payload["path"]
# read bytes ------------------------------------------------------
if isinstance(payload, str) and os.path.isfile(payload):
async with aiofiles.open(Path(payload), "rb") as f:
data = await f.read()
else: # base-64
if isinstance(payload, str) and payload.startswith("data:"):
payload = payload.split(",", 1)[1]
payload += "=" * (-len(payload) % 4) # type: ignore
data = base64.b64decode(payload) # type: ignore
# upload ----------------------------------------------------------
domain = domain_name.replace(".", "_")
return await upload_automation_image(data, domain)
# Create FunctionTool wrapper
from autogen_core.tools import FunctionTool
upload_tool = FunctionTool(
upload_automation_image_async,
description="Upload base64 encoded screenshot content to Azure Blob Storage using domain name"
)
report_upload_agent = AssistantAgent(
name="report_upload_agent",
model_client=get_fresh_model_client(),
description="Upload the {domain_name}_screenshot file to azure blob storage. ",
tools=[upload_tool]
)
from autogen_agentchat.base import TaskResult
from tools.llm_usage_tracker import LLMUsageTracker
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.messages import BaseChatMessage
from autogen_agentchat.teams import SelectorGroupChat
import logging
from autogen_core import EVENT_LOGGER_NAME
logger = logging.getLogger(EVENT_LOGGER_NAME)
logger.setLevel(logging.INFO)
my_handler = LLMUsageTracker()
logger.handlers = [my_handler]
runtime = SingleThreadedAgentRuntime()
automation_task = "Navigate to https://example.com, take a screenshot, and upload to Azure Blob Storage"
print(f"πŸš€ Task: {automation_task}")
print(f"πŸ” URL to navigate: https://example.com")
try:
# Configure server parameters
server_params = SseServerParams(
url="http://localhost:8931/sse",
timeout=90.0,
)
print("πŸ”— Connecting to Playwright MCP server...")
# Connect to workbench
async with create_mcp_server_session(server_params) as session:
await session.initialize()
mcp_tools = await mcp_server_tools(server_params=server_params, session=session)
print(f"Tools: {[tool.name for tool in mcp_tools]}")
# Create AssistantAgent with MCP tools
all_tools = mcp_tools
playwright_agent = AssistantAgent(
name="playwright_agent",
model_client=get_fresh_model_client(),
description="Playwright web automation agent for browser automation tasks",
system_message="""You are a Playwright web automation agent. You can:
NAVIGATION:
- Use browser_navigate with proper URL parameter
- Always include full URL with https:// protocol
- Example: {"url": "https://example.com"}
SCREENSHOTS:
- Use browser_take_screenshot with filename and raw parameters
- Set raw=false always (never true)
- Extract domain from URL for filename (e.g., "example" from "https://example.com")
- Example: {"filename": "example.png", "raw": false}
WORKFLOW:
1. Navigate to the website using browser_navigate
2. Take screenshot using browser_take_screenshot
3. Return the screenshot file path
PARAMETER FORMAT:
- For browser_navigate: Pass URL as {"url": "https://example.com"}
- For browser_take_screenshot: Pass {"filename": "domain.png", "raw": false}
Always complete ALL requested steps. Provide clear feedback about each action performed.""",
tools=all_tools # type: ignore
)
print("βœ… Playwright agent created successfully")
# Create a simple team with just the playwright agent
planning_agent = AssistantAgent(
"PlanningAgent",
description="An agent for planning tasks, this agent should be the first to engage when given a new task.",
model_client=get_fresh_model_client(),
system_message="""
You are a planning agent.
Your job is to break down complex tasks into smaller, manageable subtasks.
Your team members are:
1. playwright_agent: Handles web browser automation
- Navigate to websites: Specify complete URLs with https:// protocol
- Take screenshots: Returns file path of saved screenshot
- URL format must be: "https://example.com" (complete with protocol)
2. report_upload_agent: Uploads screenshots to Azure Blob Storage
- Requires screenshot file path from playwright_agent
- Needs domain name for storage organization
You only plan and delegate tasks - you do not execute them yourself.
When assigning tasks, use this format:
1. <agent> : <task>
After all tasks are complete, summarize the findings. Only when all tasks are complete, complete the workflow by responding with "TERMINATE".
""",
)
text_mention_termination = TextMentionTermination("TERMINATE")
max_messages_termination = MaxMessageTermination(max_messages=8)
termination = text_mention_termination | max_messages_termination
selector_prompt = """Select an agent to perform task.
{roles}
Current conversation context:
{history}
Read the above conversation, then select an agent from {participants} to perform the next task.
Make sure the planner agent has assigned tasks before other agents start working.
Only select one agent at a time.
"""
team = SelectorGroupChat(
[planning_agent, playwright_agent, report_upload_agent],
termination_condition=termination,
allow_repeated_speaker=True,
model_client=get_fresh_model_client(),
selector_prompt=selector_prompt
)
from autogen_agentchat.messages import ToolCallRequestEvent, ToolCallExecutionEvent, ToolCallSummaryMessage
def _tool_names(msg: BaseChatMessage) -> str | None:
"""Return comma-separated tool names or None."""
if hasattr(msg, "tool_calls") and msg.tool_calls: # Request / summary
return ", ".join(tc.name for tc in msg.tool_calls)
if isinstance(msg, ToolCallExecutionEvent): # Execution result
return msg.content[0].name if msg.content else None
return None
messages = []
runtime.on_tool_start = lambda ev: print("β–Ά", ev.tool_call.tool_name) # type: ignore
runtime.on_tool_end = lambda ev: print("βœ”", ev.tool_call.tool_name, ev.result) # type: ignore
runtime.start()
message_count = 0
async for message in team.run_stream(task=automation_task):
if isinstance(message, TaskResult):
print("Stop Reason:", message.stop_reason)
if isinstance(message, BaseChatMessage):
message_count += 1
print(f'.................Message {message_count}.....................')
print(f"\033[93m........ Source: {message.source}\033[0m")
show_content = True
tools = _tool_names(message)
if tools:
print(f"πŸ› οΈ tool call β†’ {tools}")
if "browser_take_screenshot" in tools:
show_content = False
if show_content:
print(f"\033[94m{message.content}\033[0m") # type: ignore
if isinstance(message, ToolCallSummaryMessage) and show_content:
print(f"πŸ› οΈ tool call summary β†’ {message.content}")
print(f"\033[95m Input tokens so far:{my_handler.prompt_tokens}\033[0m")
print(f"\033[95m Output tokens so far:{my_handler.completion_tokens}\033[0m")
print(f"\033[91mTotal input tokens: {my_handler.prompt_tokens}\033[0m")
print(f"\033[92mTotal output tokens: {my_handler.completion_tokens}\033[0m")
print(f"\033[91mPrice of input tokens: Β£{my_handler.prompt_tokens * 0.30 / 1000000}\033[0m")
print(f"\033[92mPrice of output tokens: Β£{my_handler.completion_tokens * 1.20 / 1000000}\033[0m")
my_handler.reset()
await team.reset()
await runtime.stop()
print("βœ… Task completed successfully")
print(f"Total messages: {len(messages)}")
# Show final message if available
if messages:
final_message = messages[-1]
if hasattr(final_message, 'content'):
print(f"Final response: {final_message.content}")
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment