Skip to content

Instantly share code, notes, and snippets.

@vuturi
Created January 31, 2026 22:51
Show Gist options
  • Select an option

  • Save vuturi/a75ad7be421d6642619ac4a5d762b430 to your computer and use it in GitHub Desktop.

Select an option

Save vuturi/a75ad7be421d6642619ac4a5d762b430 to your computer and use it in GitHub Desktop.
Agent Log Analyzer - Parse and analyze agent logs for insights
#!/usr/bin/env python3
"""
Agent Log Analyzer v1.0
Built by SamTheArchitect for the Shipyard community
Parse and analyze agent logs to extract insights, detect errors,
and track performance metrics.
Features:
- Parse common log formats (JSON, structured, plain text)
- Extract error patterns and frequencies
- Session analysis (duration, turns, costs)
- Tool usage statistics
- Export reports in JSON/Markdown
Usage:
python log_analyzer.py analyze logs/session.log
python log_analyzer.py errors logs/ --since 24h
python log_analyzer.py stats logs/ --format markdown
python log_analyzer.py sessions logs/ --top 10
"""
import json
import re
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from collections import Counter, defaultdict
from dataclasses import dataclass, asdict
import glob
@dataclass
class LogEntry:
timestamp: Optional[datetime]
level: str
message: str
source: str
metadata: Dict[str, Any]
raw: str
@dataclass
class SessionStats:
session_id: str
start_time: Optional[datetime]
end_time: Optional[datetime]
duration_seconds: float
message_count: int
error_count: int
tool_calls: List[str]
models_used: List[str]
estimated_cost: float
@dataclass
class AnalysisReport:
total_entries: int
time_range: Dict[str, str]
level_distribution: Dict[str, int]
error_patterns: List[Dict[str, Any]]
tool_usage: Dict[str, int]
sessions: List[SessionStats]
def to_dict(self) -> dict:
return {
"total_entries": self.total_entries,
"time_range": self.time_range,
"level_distribution": self.level_distribution,
"error_patterns": self.error_patterns,
"tool_usage": self.tool_usage,
"sessions": [asdict(s) for s in self.sessions]
}
class LogParser:
"""Parse various log formats"""
# Common timestamp patterns
TIMESTAMP_PATTERNS = [
(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?)', '%Y-%m-%dT%H:%M:%S'),
(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})', '%Y-%m-%d %H:%M:%S'),
(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]', '%Y-%m-%d %H:%M:%S'),
(r'(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2})', '%d/%b/%Y:%H:%M:%S'),
]
# Log level patterns
LEVEL_PATTERNS = [
r'\b(ERROR|ERR)\b',
r'\b(WARN(?:ING)?)\b',
r'\b(INFO)\b',
r'\b(DEBUG)\b',
r'\b(TRACE)\b',
r'\b(FATAL|CRITICAL)\b',
]
@classmethod
def parse_line(cls, line: str, source: str = "") -> LogEntry:
"""Parse a single log line"""
line = line.strip()
if not line:
return None
# Try JSON first
if line.startswith('{'):
try:
data = json.loads(line)
ts = None
if 'timestamp' in data:
ts = cls._parse_timestamp(data['timestamp'])
elif 'time' in data:
ts = cls._parse_timestamp(data['time'])
elif '@timestamp' in data:
ts = cls._parse_timestamp(data['@timestamp'])
level = data.get('level', data.get('severity', 'INFO')).upper()
message = data.get('message', data.get('msg', str(data)))
return LogEntry(
timestamp=ts,
level=level,
message=message,
source=source,
metadata=data,
raw=line
)
except json.JSONDecodeError:
pass
# Parse structured log
timestamp = None
for pattern, fmt in cls.TIMESTAMP_PATTERNS:
match = re.search(pattern, line)
if match:
try:
ts_str = match.group(1)
# Handle ISO format with Z
if ts_str.endswith('Z'):
ts_str = ts_str[:-1]
timestamp = datetime.strptime(ts_str[:19], fmt[:19] if len(ts_str) < 20 else fmt)
break
except ValueError:
continue
# Extract level
level = "INFO"
for pattern in cls.LEVEL_PATTERNS:
match = re.search(pattern, line, re.IGNORECASE)
if match:
level = match.group(1).upper()
if level == "ERR":
level = "ERROR"
elif level.startswith("WARN"):
level = "WARNING"
break
return LogEntry(
timestamp=timestamp,
level=level,
message=line,
source=source,
metadata={},
raw=line
)
@classmethod
def _parse_timestamp(cls, ts_value) -> Optional[datetime]:
"""Parse timestamp from various formats"""
if isinstance(ts_value, (int, float)):
# Unix timestamp
if ts_value > 1e12: # milliseconds
ts_value = ts_value / 1000
return datetime.fromtimestamp(ts_value)
if isinstance(ts_value, str):
for pattern, fmt in cls.TIMESTAMP_PATTERNS:
try:
ts_str = ts_value[:19]
return datetime.strptime(ts_str, fmt[:len(ts_str)+2])
except ValueError:
continue
return None
@classmethod
def parse_file(cls, filepath: str) -> List[LogEntry]:
"""Parse entire log file"""
entries = []
path = Path(filepath)
with open(path, 'r', errors='ignore') as f:
for line in f:
entry = cls.parse_line(line, source=str(path))
if entry:
entries.append(entry)
return entries
class LogAnalyzer:
"""Analyze parsed log entries"""
# Common error patterns
ERROR_PATTERNS = [
(r'(connection refused|ECONNREFUSED)', 'Connection Error'),
(r'(timeout|ETIMEDOUT)', 'Timeout'),
(r'(rate limit|429|too many requests)', 'Rate Limit'),
(r'(auth|unauthorized|403|401)', 'Auth Error'),
(r'(not found|404)', 'Not Found'),
(r'(internal server error|500)', 'Server Error'),
(r'(out of memory|OOM)', 'Memory Error'),
(r'(disk full|no space)', 'Disk Error'),
(r'(api key|invalid key)', 'API Key Error'),
(r'(token limit|max tokens)', 'Token Limit'),
]
# Tool patterns (for Clawdbot-style logs)
TOOL_PATTERNS = [
r'tool[_\s]*call[:\s]+(\w+)',
r'executing[:\s]+(\w+)',
r'<invoke name="(\w+)"',
r'"tool":\s*"(\w+)"',
r'function[:\s]+(\w+)',
]
def __init__(self, entries: List[LogEntry]):
self.entries = entries
def analyze(self) -> AnalysisReport:
"""Run full analysis"""
if not self.entries:
return AnalysisReport(
total_entries=0,
time_range={"start": None, "end": None},
level_distribution={},
error_patterns=[],
tool_usage={},
sessions=[]
)
# Time range
timestamps = [e.timestamp for e in self.entries if e.timestamp]
time_range = {
"start": min(timestamps).isoformat() if timestamps else None,
"end": max(timestamps).isoformat() if timestamps else None
}
# Level distribution
level_dist = Counter(e.level for e in self.entries)
# Error patterns
error_entries = [e for e in self.entries if e.level in ('ERROR', 'FATAL', 'CRITICAL')]
error_patterns = self._classify_errors(error_entries)
# Tool usage
tool_usage = self._extract_tool_usage()
# Sessions (simplified - group by time gaps)
sessions = self._extract_sessions()
return AnalysisReport(
total_entries=len(self.entries),
time_range=time_range,
level_distribution=dict(level_dist),
error_patterns=error_patterns,
tool_usage=tool_usage,
sessions=sessions
)
def _classify_errors(self, errors: List[LogEntry]) -> List[Dict[str, Any]]:
"""Classify errors by pattern"""
classified = defaultdict(list)
for entry in errors:
found = False
for pattern, category in self.ERROR_PATTERNS:
if re.search(pattern, entry.message, re.IGNORECASE):
classified[category].append(entry.message[:200])
found = True
break
if not found:
classified['Other'].append(entry.message[:200])
return [
{"category": cat, "count": len(msgs), "examples": msgs[:3]}
for cat, msgs in sorted(classified.items(), key=lambda x: -len(x[1]))
]
def _extract_tool_usage(self) -> Dict[str, int]:
"""Extract tool call statistics"""
tools = Counter()
for entry in self.entries:
text = entry.raw
for pattern in self.TOOL_PATTERNS:
for match in re.finditer(pattern, text, re.IGNORECASE):
tools[match.group(1).lower()] += 1
return dict(tools.most_common(20))
def _extract_sessions(self) -> List[SessionStats]:
"""Extract session statistics (simplified)"""
if not self.entries:
return []
# Group entries by 30-min gaps
sessions = []
current_session = []
last_ts = None
for entry in sorted(self.entries, key=lambda e: e.timestamp or datetime.min):
if entry.timestamp:
if last_ts and (entry.timestamp - last_ts) > timedelta(minutes=30):
if current_session:
sessions.append(current_session)
current_session = []
current_session.append(entry)
last_ts = entry.timestamp
if current_session:
sessions.append(current_session)
# Convert to SessionStats
results = []
for i, session in enumerate(sessions[:10]): # Limit to 10 sessions
timestamps = [e.timestamp for e in session if e.timestamp]
errors = [e for e in session if e.level in ('ERROR', 'FATAL')]
tools = []
for entry in session:
for pattern in self.TOOL_PATTERNS:
tools.extend(re.findall(pattern, entry.raw, re.IGNORECASE))
results.append(SessionStats(
session_id=f"session_{i+1}",
start_time=min(timestamps) if timestamps else None,
end_time=max(timestamps) if timestamps else None,
duration_seconds=(max(timestamps) - min(timestamps)).total_seconds() if len(timestamps) > 1 else 0,
message_count=len(session),
error_count=len(errors),
tool_calls=list(set(t.lower() for t in tools))[:10],
models_used=[],
estimated_cost=0.0
))
return results
def get_errors(self, since: Optional[timedelta] = None) -> List[LogEntry]:
"""Get error entries"""
errors = [e for e in self.entries if e.level in ('ERROR', 'FATAL', 'CRITICAL')]
if since and errors:
cutoff = datetime.now() - since
errors = [e for e in errors if e.timestamp and e.timestamp > cutoff]
return errors
def format_markdown(report: AnalysisReport) -> str:
"""Format report as Markdown"""
lines = [
"# Agent Log Analysis Report",
f"\n**Generated:** {datetime.now().isoformat()}",
f"\n**Total Entries:** {report.total_entries}",
f"\n**Time Range:** {report.time_range['start']} to {report.time_range['end']}",
"\n## Log Level Distribution\n"
]
for level, count in sorted(report.level_distribution.items()):
pct = (count / report.total_entries * 100) if report.total_entries else 0
lines.append(f"- **{level}**: {count} ({pct:.1f}%)")
if report.error_patterns:
lines.append("\n## Error Patterns\n")
for err in report.error_patterns:
lines.append(f"### {err['category']} ({err['count']} occurrences)\n")
for ex in err['examples']:
lines.append(f"- `{ex[:100]}...`")
if report.tool_usage:
lines.append("\n## Tool Usage\n")
lines.append("| Tool | Count |")
lines.append("|------|-------|")
for tool, count in sorted(report.tool_usage.items(), key=lambda x: -x[1]):
lines.append(f"| {tool} | {count} |")
return "\n".join(lines)
def parse_duration(duration_str: str) -> timedelta:
"""Parse duration string like '24h', '7d', '30m'"""
match = re.match(r'(\d+)([hdm])', duration_str.lower())
if not match:
return timedelta(hours=24)
value = int(match.group(1))
unit = match.group(2)
if unit == 'h':
return timedelta(hours=value)
elif unit == 'd':
return timedelta(days=value)
elif unit == 'm':
return timedelta(minutes=value)
return timedelta(hours=24)
def main():
parser = argparse.ArgumentParser(description="Agent Log Analyzer")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Analyze command
analyze_parser = subparsers.add_parser("analyze", help="Full analysis")
analyze_parser.add_argument("path", help="Log file or directory")
analyze_parser.add_argument("--format", choices=["json", "markdown"], default="json")
# Errors command
errors_parser = subparsers.add_parser("errors", help="List errors")
errors_parser.add_argument("path", help="Log file or directory")
errors_parser.add_argument("--since", default="24h", help="Time range (e.g., 24h, 7d)")
# Stats command
stats_parser = subparsers.add_parser("stats", help="Quick statistics")
stats_parser.add_argument("path", help="Log file or directory")
stats_parser.add_argument("--format", choices=["json", "markdown"], default="json")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
# Collect log files
path = Path(args.path)
if path.is_dir():
files = list(path.glob("*.log")) + list(path.glob("*.json"))
else:
files = [path]
# Parse all files
all_entries = []
for f in files:
all_entries.extend(LogParser.parse_file(str(f)))
analyzer = LogAnalyzer(all_entries)
if args.command == "analyze":
report = analyzer.analyze()
if args.format == "markdown":
print(format_markdown(report))
else:
print(json.dumps(report.to_dict(), indent=2, default=str))
elif args.command == "errors":
since = parse_duration(args.since)
errors = analyzer.get_errors(since=since)
for e in errors:
ts = e.timestamp.isoformat() if e.timestamp else "unknown"
print(f"[{ts}] {e.message[:200]}")
elif args.command == "stats":
report = analyzer.analyze()
if args.format == "markdown":
print(format_markdown(report))
else:
print(json.dumps(report.to_dict(), indent=2, default=str))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment