Skip to content

Instantly share code, notes, and snippets.

View kausmeows's full-sized avatar
:octocat:
Exploiting the plasticity of human brain

Kaustubh kausmeows

:octocat:
Exploiting the plasticity of human brain
View GitHub Profile
"""
Parser Model — Structured Output Debug
=======================================
Proves that:
- Primary model call: response_format is None (no structured output params)
- Parser model call: response_format contains the full JSON schema (native structured output)
"""
import json
"""
Custom Retriever with RunContext
=============================
Demonstrates how to pass application-controlled data (like a project_id
or file_name) into a custom retriever using RunContext.dependencies.
This is useful when:
- Your retriever needs scoping (e.g., per-project, per-tenant)
- You want to pass runtime filters that the LLM shouldn't control
"""
Test Team HITL Continue Run API
===============================
This script tests the team continue run endpoint by:
1. Starting an AgentOS server with a team that has a tool requiring confirmation
2. Creating a run via the API (which will pause)
3. Continuing the run via the /continue API endpoint
"""
"""
Example: CustomEvents with Teams (respond_directly=False)
This demonstrates how CustomEvents from member agents are streamed
to the user-facing stream even when respond_directly=False.
"""
import time
from agno.agent import Agent
from agno.team import Team
from typing import AsyncIterator, Union
from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from agno.models.anthropic import Claude
from agno.os import AgentOS
from agno.run.workflow import WorkflowRunOutputEvent
from agno.workflow import Step, Workflow
from agno.workflow.steps import Steps
from agno.workflow.types import StepInput, StepOutput
@kausmeows
kausmeows / mcp_errors.py
Created February 1, 2026 18:32
MCP tool failure isolation in AgentOS
"""
Example demonstrating MCP tool failure isolation in AgentOS.
This example shows that when an AgentOS app includes multiple agents - some with MCP tools
and some without - a connection failure to an MCP server should NOT cause the entire app
to fail. Only the agent using the failing MCP tool should be affected.
Run this example with:
python cookbook/05_agent_os/mcp_demo/mcp_tools_failure_isolation.py
"""
Example: Workflow with Conversational Intake and Background Research
This demonstrates a Workflow where:
1. An intake agent step has a conversation with the user
2. Once the user's name is captured, background research kicks off
3. The research results are stored in the workflow's shared session_state
4. Subsequent workflow steps/turns can access the research
Run with: python libs/agno/agno/test.py
# paper2saas_agent.py
import argparse
import os
import json
from typing import Dict, Any, List
from pydantic import BaseModel, Field
# Agno SDK Imports
from agno.workflow import Workflow, Step, Parallel
@kausmeows
kausmeows / router_with_early_stopping.py
Created December 28, 2025 23:33
To show how router can return a step (with stop=True in the custom function step that will end the workflow)
from typing import List
from agno.agent.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
# Define your agents/steps
@kausmeows
kausmeows / router_step_returns_step.py
Created December 28, 2025 23:24
Showing how a router step returns one/multiple steps.
from typing import List
from agno.agent.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow
# Define your agents/steps