Created
July 15, 2025 18:26
-
-
Save pavlovmilen/757729e42c947c0b133a9aa4159badc5 to your computer and use it in GitHub Desktop.
agents_with_mcp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # model client | |
| def get_fresh_model_client(): | |
| """Create a fresh Azure OpenAI client""" | |
| return AzureOpenAIChatCompletionClient( | |
| model=AZURE_OPENAI_MODEL, | |
| api_version=AZURE_OPENAI_API_VERSION, | |
| azure_deployment=AZURE_OPENAI_DEPLOYMENT, | |
| azure_endpoint=AZURE_OPENAI_ENDPOINT_SWEDEN, | |
| api_key=AZURE_OPENAI_API_KEY_SWEDEN, | |
| timeout=180.0, | |
| max_retries=2 | |
| ) | |
| # Image upload tool and agent | |
| from autogen_agentchat.agents import AssistantAgent | |
| from tools.file_upload_service import upload_automation_image | |
| # Create async wrapper for upload function with base64 conversion | |
| import os, base64, aiofiles | |
| from pathlib import Path | |
| from typing import Union, Dict, Any, List | |
| from autogen_core.tools import FunctionTool | |
| # existing upload helper you already have | |
| from tools.file_upload_service import upload_automation_image | |
| async def upload_automation_image_async( | |
| payload: Union[str, Dict[str, Any], List[Any]], | |
| domain_name: str, | |
| ) -> str: | |
| """ | |
| payload : absolute file path, {"type":"file","path": ...} dict, or base-64 string | |
| domain_name: example.com | |
| """ | |
| # normalise input ------------------------------------------------- | |
| if isinstance(payload, list): | |
| payload = next((item for item in payload if isinstance(item, (str, dict))), payload[0]) | |
| if isinstance(payload, dict) and payload.get("type") == "file": | |
| payload = payload["path"] | |
| # read bytes ------------------------------------------------------ | |
| if isinstance(payload, str) and os.path.isfile(payload): | |
| async with aiofiles.open(Path(payload), "rb") as f: | |
| data = await f.read() | |
| else: # base-64 | |
| if isinstance(payload, str) and payload.startswith("data:"): | |
| payload = payload.split(",", 1)[1] | |
| payload += "=" * (-len(payload) % 4) # type: ignore | |
| data = base64.b64decode(payload) # type: ignore | |
| # upload ---------------------------------------------------------- | |
| domain = domain_name.replace(".", "_") | |
| return await upload_automation_image(data, domain) | |
| # Create FunctionTool wrapper | |
| from autogen_core.tools import FunctionTool | |
| upload_tool = FunctionTool( | |
| upload_automation_image_async, | |
| description="Upload base64 encoded screenshot content to Azure Blob Storage using domain name" | |
| ) | |
| report_upload_agent = AssistantAgent( | |
| name="report_upload_agent", | |
| model_client=get_fresh_model_client(), | |
| description="Upload the {domain_name}_screenshot file to azure blob storage. ", | |
| tools=[upload_tool] | |
| ) | |
| from autogen_agentchat.base import TaskResult | |
| from tools.llm_usage_tracker import LLMUsageTracker | |
| from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination | |
| from autogen_agentchat.messages import BaseChatMessage | |
| from autogen_agentchat.teams import SelectorGroupChat | |
| import logging | |
| from autogen_core import EVENT_LOGGER_NAME | |
| logger = logging.getLogger(EVENT_LOGGER_NAME) | |
| logger.setLevel(logging.INFO) | |
| my_handler = LLMUsageTracker() | |
| logger.handlers = [my_handler] | |
| runtime = SingleThreadedAgentRuntime() | |
| automation_task = "Navigate to https://example.com, take a screenshot, and upload to Azure Blob Storage" | |
| print(f"π Task: {automation_task}") | |
| print(f"π URL to navigate: https://example.com") | |
| try: | |
| # Configure server parameters | |
| server_params = SseServerParams( | |
| url="http://localhost:8931/sse", | |
| timeout=90.0, | |
| ) | |
| print("π Connecting to Playwright MCP server...") | |
| # Connect to workbench | |
| async with create_mcp_server_session(server_params) as session: | |
| await session.initialize() | |
| mcp_tools = await mcp_server_tools(server_params=server_params, session=session) | |
| print(f"Tools: {[tool.name for tool in mcp_tools]}") | |
| # Create AssistantAgent with MCP tools | |
| all_tools = mcp_tools | |
| playwright_agent = AssistantAgent( | |
| name="playwright_agent", | |
| model_client=get_fresh_model_client(), | |
| description="Playwright web automation agent for browser automation tasks", | |
| system_message="""You are a Playwright web automation agent. You can: | |
| NAVIGATION: | |
| - Use browser_navigate with proper URL parameter | |
| - Always include full URL with https:// protocol | |
| - Example: {"url": "https://example.com"} | |
| SCREENSHOTS: | |
| - Use browser_take_screenshot with filename and raw parameters | |
| - Set raw=false always (never true) | |
| - Extract domain from URL for filename (e.g., "example" from "https://example.com") | |
| - Example: {"filename": "example.png", "raw": false} | |
| WORKFLOW: | |
| 1. Navigate to the website using browser_navigate | |
| 2. Take screenshot using browser_take_screenshot | |
| 3. Return the screenshot file path | |
| PARAMETER FORMAT: | |
| - For browser_navigate: Pass URL as {"url": "https://example.com"} | |
| - For browser_take_screenshot: Pass {"filename": "domain.png", "raw": false} | |
| Always complete ALL requested steps. Provide clear feedback about each action performed.""", | |
| tools=all_tools # type: ignore | |
| ) | |
| print("β Playwright agent created successfully") | |
| # Create a simple team with just the playwright agent | |
| planning_agent = AssistantAgent( | |
| "PlanningAgent", | |
| description="An agent for planning tasks, this agent should be the first to engage when given a new task.", | |
| model_client=get_fresh_model_client(), | |
| system_message=""" | |
| You are a planning agent. | |
| Your job is to break down complex tasks into smaller, manageable subtasks. | |
| Your team members are: | |
| 1. playwright_agent: Handles web browser automation | |
| - Navigate to websites: Specify complete URLs with https:// protocol | |
| - Take screenshots: Returns file path of saved screenshot | |
| - URL format must be: "https://example.com" (complete with protocol) | |
| 2. report_upload_agent: Uploads screenshots to Azure Blob Storage | |
| - Requires screenshot file path from playwright_agent | |
| - Needs domain name for storage organization | |
| You only plan and delegate tasks - you do not execute them yourself. | |
| When assigning tasks, use this format: | |
| 1. <agent> : <task> | |
| After all tasks are complete, summarize the findings. Only when all tasks are complete, complete the workflow by responding with "TERMINATE". | |
| """, | |
| ) | |
| text_mention_termination = TextMentionTermination("TERMINATE") | |
| max_messages_termination = MaxMessageTermination(max_messages=8) | |
| termination = text_mention_termination | max_messages_termination | |
| selector_prompt = """Select an agent to perform task. | |
| {roles} | |
| Current conversation context: | |
| {history} | |
| Read the above conversation, then select an agent from {participants} to perform the next task. | |
| Make sure the planner agent has assigned tasks before other agents start working. | |
| Only select one agent at a time. | |
| """ | |
| team = SelectorGroupChat( | |
| [planning_agent, playwright_agent, report_upload_agent], | |
| termination_condition=termination, | |
| allow_repeated_speaker=True, | |
| model_client=get_fresh_model_client(), | |
| selector_prompt=selector_prompt | |
| ) | |
| from autogen_agentchat.messages import ToolCallRequestEvent, ToolCallExecutionEvent, ToolCallSummaryMessage | |
| def _tool_names(msg: BaseChatMessage) -> str | None: | |
| """Return comma-separated tool names or None.""" | |
| if hasattr(msg, "tool_calls") and msg.tool_calls: # Request / summary | |
| return ", ".join(tc.name for tc in msg.tool_calls) | |
| if isinstance(msg, ToolCallExecutionEvent): # Execution result | |
| return msg.content[0].name if msg.content else None | |
| return None | |
| messages = [] | |
| runtime.on_tool_start = lambda ev: print("βΆ", ev.tool_call.tool_name) # type: ignore | |
| runtime.on_tool_end = lambda ev: print("β", ev.tool_call.tool_name, ev.result) # type: ignore | |
| runtime.start() | |
| message_count = 0 | |
| async for message in team.run_stream(task=automation_task): | |
| if isinstance(message, TaskResult): | |
| print("Stop Reason:", message.stop_reason) | |
| if isinstance(message, BaseChatMessage): | |
| message_count += 1 | |
| print(f'.................Message {message_count}.....................') | |
| print(f"\033[93m........ Source: {message.source}\033[0m") | |
| show_content = True | |
| tools = _tool_names(message) | |
| if tools: | |
| print(f"π οΈ tool call β {tools}") | |
| if "browser_take_screenshot" in tools: | |
| show_content = False | |
| if show_content: | |
| print(f"\033[94m{message.content}\033[0m") # type: ignore | |
| if isinstance(message, ToolCallSummaryMessage) and show_content: | |
| print(f"π οΈ tool call summary β {message.content}") | |
| print(f"\033[95m Input tokens so far:{my_handler.prompt_tokens}\033[0m") | |
| print(f"\033[95m Output tokens so far:{my_handler.completion_tokens}\033[0m") | |
| print(f"\033[91mTotal input tokens: {my_handler.prompt_tokens}\033[0m") | |
| print(f"\033[92mTotal output tokens: {my_handler.completion_tokens}\033[0m") | |
| print(f"\033[91mPrice of input tokens: Β£{my_handler.prompt_tokens * 0.30 / 1000000}\033[0m") | |
| print(f"\033[92mPrice of output tokens: Β£{my_handler.completion_tokens * 1.20 / 1000000}\033[0m") | |
| my_handler.reset() | |
| await team.reset() | |
| await runtime.stop() | |
| print("β Task completed successfully") | |
| print(f"Total messages: {len(messages)}") | |
| # Show final message if available | |
| if messages: | |
| final_message = messages[-1] | |
| if hasattr(final_message, 'content'): | |
| print(f"Final response: {final_message.content}") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment