Created
January 31, 2026 22:51
-
-
Save vuturi/a75ad7be421d6642619ac4a5d762b430 to your computer and use it in GitHub Desktop.
Agent Log Analyzer - Parse and analyze agent logs for insights
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Agent Log Analyzer v1.0 | |
| Built by SamTheArchitect for the Shipyard community | |
| Parse and analyze agent logs to extract insights, detect errors, | |
| and track performance metrics. | |
| Features: | |
| - Parse common log formats (JSON, structured, plain text) | |
| - Extract error patterns and frequencies | |
| - Session analysis (duration, turns, costs) | |
| - Tool usage statistics | |
| - Export reports in JSON/Markdown | |
| Usage: | |
| python log_analyzer.py analyze logs/session.log | |
| python log_analyzer.py errors logs/ --since 24h | |
| python log_analyzer.py stats logs/ --format markdown | |
| python log_analyzer.py sessions logs/ --top 10 | |
| """ | |
| import json | |
| import re | |
| import argparse | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass, asdict | |
| import glob | |
| @dataclass | |
| class LogEntry: | |
| timestamp: Optional[datetime] | |
| level: str | |
| message: str | |
| source: str | |
| metadata: Dict[str, Any] | |
| raw: str | |
| @dataclass | |
| class SessionStats: | |
| session_id: str | |
| start_time: Optional[datetime] | |
| end_time: Optional[datetime] | |
| duration_seconds: float | |
| message_count: int | |
| error_count: int | |
| tool_calls: List[str] | |
| models_used: List[str] | |
| estimated_cost: float | |
| @dataclass | |
| class AnalysisReport: | |
| total_entries: int | |
| time_range: Dict[str, str] | |
| level_distribution: Dict[str, int] | |
| error_patterns: List[Dict[str, Any]] | |
| tool_usage: Dict[str, int] | |
| sessions: List[SessionStats] | |
| def to_dict(self) -> dict: | |
| return { | |
| "total_entries": self.total_entries, | |
| "time_range": self.time_range, | |
| "level_distribution": self.level_distribution, | |
| "error_patterns": self.error_patterns, | |
| "tool_usage": self.tool_usage, | |
| "sessions": [asdict(s) for s in self.sessions] | |
| } | |
| class LogParser: | |
| """Parse various log formats""" | |
| # Common timestamp patterns | |
| TIMESTAMP_PATTERNS = [ | |
| (r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?)', '%Y-%m-%dT%H:%M:%S'), | |
| (r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', '%Y-%m-%d %H:%M:%S'), | |
| (r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', '%Y-%m-%d %H:%M:%S'), | |
| (r'(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})', '%d/%b/%Y:%H:%M:%S'), | |
| ] | |
| # Log level patterns | |
| LEVEL_PATTERNS = [ | |
| r'\b(ERROR|ERR)\b', | |
| r'\b(WARN(?:ING)?)\b', | |
| r'\b(INFO)\b', | |
| r'\b(DEBUG)\b', | |
| r'\b(TRACE)\b', | |
| r'\b(FATAL|CRITICAL)\b', | |
| ] | |
| @classmethod | |
| def parse_line(cls, line: str, source: str = "") -> LogEntry: | |
| """Parse a single log line""" | |
| line = line.strip() | |
| if not line: | |
| return None | |
| # Try JSON first | |
| if line.startswith('{'): | |
| try: | |
| data = json.loads(line) | |
| ts = None | |
| if 'timestamp' in data: | |
| ts = cls._parse_timestamp(data['timestamp']) | |
| elif 'time' in data: | |
| ts = cls._parse_timestamp(data['time']) | |
| elif '@timestamp' in data: | |
| ts = cls._parse_timestamp(data['@timestamp']) | |
| level = data.get('level', data.get('severity', 'INFO')).upper() | |
| message = data.get('message', data.get('msg', str(data))) | |
| return LogEntry( | |
| timestamp=ts, | |
| level=level, | |
| message=message, | |
| source=source, | |
| metadata=data, | |
| raw=line | |
| ) | |
| except json.JSONDecodeError: | |
| pass | |
| # Parse structured log | |
| timestamp = None | |
| for pattern, fmt in cls.TIMESTAMP_PATTERNS: | |
| match = re.search(pattern, line) | |
| if match: | |
| try: | |
| ts_str = match.group(1) | |
| # Handle ISO format with Z | |
| if ts_str.endswith('Z'): | |
| ts_str = ts_str[:-1] | |
| timestamp = datetime.strptime(ts_str[:19], fmt[:19] if len(ts_str) < 20 else fmt) | |
| break | |
| except ValueError: | |
| continue | |
| # Extract level | |
| level = "INFO" | |
| for pattern in cls.LEVEL_PATTERNS: | |
| match = re.search(pattern, line, re.IGNORECASE) | |
| if match: | |
| level = match.group(1).upper() | |
| if level == "ERR": | |
| level = "ERROR" | |
| elif level.startswith("WARN"): | |
| level = "WARNING" | |
| break | |
| return LogEntry( | |
| timestamp=timestamp, | |
| level=level, | |
| message=line, | |
| source=source, | |
| metadata={}, | |
| raw=line | |
| ) | |
| @classmethod | |
| def _parse_timestamp(cls, ts_value) -> Optional[datetime]: | |
| """Parse timestamp from various formats""" | |
| if isinstance(ts_value, (int, float)): | |
| # Unix timestamp | |
| if ts_value > 1e12: # milliseconds | |
| ts_value = ts_value / 1000 | |
| return datetime.fromtimestamp(ts_value) | |
| if isinstance(ts_value, str): | |
| for pattern, fmt in cls.TIMESTAMP_PATTERNS: | |
| try: | |
| ts_str = ts_value[:19] | |
| return datetime.strptime(ts_str, fmt[:len(ts_str)+2]) | |
| except ValueError: | |
| continue | |
| return None | |
| @classmethod | |
| def parse_file(cls, filepath: str) -> List[LogEntry]: | |
| """Parse entire log file""" | |
| entries = [] | |
| path = Path(filepath) | |
| with open(path, 'r', errors='ignore') as f: | |
| for line in f: | |
| entry = cls.parse_line(line, source=str(path)) | |
| if entry: | |
| entries.append(entry) | |
| return entries | |
| class LogAnalyzer: | |
| """Analyze parsed log entries""" | |
| # Common error patterns | |
| ERROR_PATTERNS = [ | |
| (r'(connection refused|ECONNREFUSED)', 'Connection Error'), | |
| (r'(timeout|ETIMEDOUT)', 'Timeout'), | |
| (r'(rate limit|429|too many requests)', 'Rate Limit'), | |
| (r'(auth|unauthorized|403|401)', 'Auth Error'), | |
| (r'(not found|404)', 'Not Found'), | |
| (r'(internal server error|500)', 'Server Error'), | |
| (r'(out of memory|OOM)', 'Memory Error'), | |
| (r'(disk full|no space)', 'Disk Error'), | |
| (r'(api key|invalid key)', 'API Key Error'), | |
| (r'(token limit|max tokens)', 'Token Limit'), | |
| ] | |
| # Tool patterns (for Clawdbot-style logs) | |
| TOOL_PATTERNS = [ | |
| r'tool[_\s]*call[:\s]+(\w+)', | |
| r'executing[:\s]+(\w+)', | |
| r'<invoke name="(\w+)"', | |
| r'"tool":\s*"(\w+)"', | |
| r'function[:\s]+(\w+)', | |
| ] | |
| def __init__(self, entries: List[LogEntry]): | |
| self.entries = entries | |
| def analyze(self) -> AnalysisReport: | |
| """Run full analysis""" | |
| if not self.entries: | |
| return AnalysisReport( | |
| total_entries=0, | |
| time_range={"start": None, "end": None}, | |
| level_distribution={}, | |
| error_patterns=[], | |
| tool_usage={}, | |
| sessions=[] | |
| ) | |
| # Time range | |
| timestamps = [e.timestamp for e in self.entries if e.timestamp] | |
| time_range = { | |
| "start": min(timestamps).isoformat() if timestamps else None, | |
| "end": max(timestamps).isoformat() if timestamps else None | |
| } | |
| # Level distribution | |
| level_dist = Counter(e.level for e in self.entries) | |
| # Error patterns | |
| error_entries = [e for e in self.entries if e.level in ('ERROR', 'FATAL', 'CRITICAL')] | |
| error_patterns = self._classify_errors(error_entries) | |
| # Tool usage | |
| tool_usage = self._extract_tool_usage() | |
| # Sessions (simplified - group by time gaps) | |
| sessions = self._extract_sessions() | |
| return AnalysisReport( | |
| total_entries=len(self.entries), | |
| time_range=time_range, | |
| level_distribution=dict(level_dist), | |
| error_patterns=error_patterns, | |
| tool_usage=tool_usage, | |
| sessions=sessions | |
| ) | |
| def _classify_errors(self, errors: List[LogEntry]) -> List[Dict[str, Any]]: | |
| """Classify errors by pattern""" | |
| classified = defaultdict(list) | |
| for entry in errors: | |
| found = False | |
| for pattern, category in self.ERROR_PATTERNS: | |
| if re.search(pattern, entry.message, re.IGNORECASE): | |
| classified[category].append(entry.message[:200]) | |
| found = True | |
| break | |
| if not found: | |
| classified['Other'].append(entry.message[:200]) | |
| return [ | |
| {"category": cat, "count": len(msgs), "examples": msgs[:3]} | |
| for cat, msgs in sorted(classified.items(), key=lambda x: -len(x[1])) | |
| ] | |
| def _extract_tool_usage(self) -> Dict[str, int]: | |
| """Extract tool call statistics""" | |
| tools = Counter() | |
| for entry in self.entries: | |
| text = entry.raw | |
| for pattern in self.TOOL_PATTERNS: | |
| for match in re.finditer(pattern, text, re.IGNORECASE): | |
| tools[match.group(1).lower()] += 1 | |
| return dict(tools.most_common(20)) | |
| def _extract_sessions(self) -> List[SessionStats]: | |
| """Extract session statistics (simplified)""" | |
| if not self.entries: | |
| return [] | |
| # Group entries by 30-min gaps | |
| sessions = [] | |
| current_session = [] | |
| last_ts = None | |
| for entry in sorted(self.entries, key=lambda e: e.timestamp or datetime.min): | |
| if entry.timestamp: | |
| if last_ts and (entry.timestamp - last_ts) > timedelta(minutes=30): | |
| if current_session: | |
| sessions.append(current_session) | |
| current_session = [] | |
| current_session.append(entry) | |
| last_ts = entry.timestamp | |
| if current_session: | |
| sessions.append(current_session) | |
| # Convert to SessionStats | |
| results = [] | |
| for i, session in enumerate(sessions[:10]): # Limit to 10 sessions | |
| timestamps = [e.timestamp for e in session if e.timestamp] | |
| errors = [e for e in session if e.level in ('ERROR', 'FATAL')] | |
| tools = [] | |
| for entry in session: | |
| for pattern in self.TOOL_PATTERNS: | |
| tools.extend(re.findall(pattern, entry.raw, re.IGNORECASE)) | |
| results.append(SessionStats( | |
| session_id=f"session_{i+1}", | |
| start_time=min(timestamps) if timestamps else None, | |
| end_time=max(timestamps) if timestamps else None, | |
| duration_seconds=(max(timestamps) - min(timestamps)).total_seconds() if len(timestamps) > 1 else 0, | |
| message_count=len(session), | |
| error_count=len(errors), | |
| tool_calls=list(set(t.lower() for t in tools))[:10], | |
| models_used=[], | |
| estimated_cost=0.0 | |
| )) | |
| return results | |
| def get_errors(self, since: Optional[timedelta] = None) -> List[LogEntry]: | |
| """Get error entries""" | |
| errors = [e for e in self.entries if e.level in ('ERROR', 'FATAL', 'CRITICAL')] | |
| if since and errors: | |
| cutoff = datetime.now() - since | |
| errors = [e for e in errors if e.timestamp and e.timestamp > cutoff] | |
| return errors | |
| def format_markdown(report: AnalysisReport) -> str: | |
| """Format report as Markdown""" | |
| lines = [ | |
| "# Agent Log Analysis Report", | |
| f"\n**Generated:** {datetime.now().isoformat()}", | |
| f"\n**Total Entries:** {report.total_entries}", | |
| f"\n**Time Range:** {report.time_range['start']} to {report.time_range['end']}", | |
| "\n## Log Level Distribution\n" | |
| ] | |
| for level, count in sorted(report.level_distribution.items()): | |
| pct = (count / report.total_entries * 100) if report.total_entries else 0 | |
| lines.append(f"- **{level}**: {count} ({pct:.1f}%)") | |
| if report.error_patterns: | |
| lines.append("\n## Error Patterns\n") | |
| for err in report.error_patterns: | |
| lines.append(f"### {err['category']} ({err['count']} occurrences)\n") | |
| for ex in err['examples']: | |
| lines.append(f"- `{ex[:100]}...`") | |
| if report.tool_usage: | |
| lines.append("\n## Tool Usage\n") | |
| lines.append("| Tool | Count |") | |
| lines.append("|------|-------|") | |
| for tool, count in sorted(report.tool_usage.items(), key=lambda x: -x[1]): | |
| lines.append(f"| {tool} | {count} |") | |
| return "\n".join(lines) | |
| def parse_duration(duration_str: str) -> timedelta: | |
| """Parse duration string like '24h', '7d', '30m'""" | |
| match = re.match(r'(\d+)([hdm])', duration_str.lower()) | |
| if not match: | |
| return timedelta(hours=24) | |
| value = int(match.group(1)) | |
| unit = match.group(2) | |
| if unit == 'h': | |
| return timedelta(hours=value) | |
| elif unit == 'd': | |
| return timedelta(days=value) | |
| elif unit == 'm': | |
| return timedelta(minutes=value) | |
| return timedelta(hours=24) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Agent Log Analyzer") | |
| subparsers = parser.add_subparsers(dest="command", help="Commands") | |
| # Analyze command | |
| analyze_parser = subparsers.add_parser("analyze", help="Full analysis") | |
| analyze_parser.add_argument("path", help="Log file or directory") | |
| analyze_parser.add_argument("--format", choices=["json", "markdown"], default="json") | |
| # Errors command | |
| errors_parser = subparsers.add_parser("errors", help="List errors") | |
| errors_parser.add_argument("path", help="Log file or directory") | |
| errors_parser.add_argument("--since", default="24h", help="Time range (e.g., 24h, 7d)") | |
| # Stats command | |
| stats_parser = subparsers.add_parser("stats", help="Quick statistics") | |
| stats_parser.add_argument("path", help="Log file or directory") | |
| stats_parser.add_argument("--format", choices=["json", "markdown"], default="json") | |
| args = parser.parse_args() | |
| if not args.command: | |
| parser.print_help() | |
| return | |
| # Collect log files | |
| path = Path(args.path) | |
| if path.is_dir(): | |
| files = list(path.glob("*.log")) + list(path.glob("*.json")) | |
| else: | |
| files = [path] | |
| # Parse all files | |
| all_entries = [] | |
| for f in files: | |
| all_entries.extend(LogParser.parse_file(str(f))) | |
| analyzer = LogAnalyzer(all_entries) | |
| if args.command == "analyze": | |
| report = analyzer.analyze() | |
| if args.format == "markdown": | |
| print(format_markdown(report)) | |
| else: | |
| print(json.dumps(report.to_dict(), indent=2, default=str)) | |
| elif args.command == "errors": | |
| since = parse_duration(args.since) | |
| errors = analyzer.get_errors(since=since) | |
| for e in errors: | |
| ts = e.timestamp.isoformat() if e.timestamp else "unknown" | |
| print(f"[{ts}] {e.message[:200]}") | |
| elif args.command == "stats": | |
| report = analyzer.analyze() | |
| if args.format == "markdown": | |
| print(format_markdown(report)) | |
| else: | |
| print(json.dumps(report.to_dict(), indent=2, default=str)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment