Skip to content

Instantly share code, notes, and snippets.

@h4sh5
Created August 6, 2025 14:04
Show Gist options
  • Select an option

  • Save h4sh5/ba54be1e8809d14fdcf8fe3a6af4aa0f to your computer and use it in GitHub Desktop.

Select an option

Save h4sh5/ba54be1e8809d14fdcf8fe3a6af4aa0f to your computer and use it in GitHub Desktop.
Totally not generated by Qwen3-235B (tested)
#!/usr/bin/env python3
import re
import sys
from collections import defaultdict
# prompt: write a python script that would take a web server log file access.log and scan it for evidence of data exfiltration after a vulnerability has been exploited to create a web shell
# Configuration thresholds
LONG_QUERY_THRESHOLD = 1000 # Characters
LARGE_RESPONSE_THRESHOLD = 100000 # Bytes (100KB)
SUSPICIOUS_PATHS = [
'shell.php', 'cmd.php', 'x.php', 'backdoor.php', 'upload.php',
'wp-admin.php', 'db.php', 'sql.php', 'r57.php', 'c99.php'
]
SCRIPT_EXTENSIONS = ['.php', '.asp', '.aspx', '.jsp', '.cfm', '.pl', '.py']
def parse_log_line(line):
"""Parse Apache combined log format line"""
pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "([^"]*)" (\d{3}) (\S+)'
match = re.match(pattern, line)
if not match:
return None
ip, timestamp, request, status, size = match.groups()
size = 0 if size == '-' else int(size)
# Parse request string
req_parts = request.split(maxsplit=2)
if len(req_parts) < 3:
return None
method, path_query, _ = req_parts
path, query = (path_query.split('?', 1) + [''])[:2]
return {
'ip': ip,
'timestamp': timestamp,
'method': method,
'path': path,
'query': query,
'status': status,
'size': size
}
def analyze_log(log_file):
"""Scan log file for exfiltration indicators"""
exfiltration_events = []
ip_resource_counts = defaultdict(lambda: defaultdict(int))
with open(log_file, 'r') as f:
for line_num, line in enumerate(f, 1):
entry = parse_log_line(line)
if not entry:
continue
# Condition 1: Suspicious web shell paths
if any(entry['path'].endswith(susp) for susp in SUSPICIOUS_PATHS):
exfiltration_events.append((
line_num,
f"Suspicious path: {entry['path']}",
entry
))
# Condition 2: Abnormally long query strings (GET exfiltration)
if entry['method'] == 'GET' and len(entry['query']) > LONG_QUERY_THRESHOLD:
exfiltration_events.append((
line_num,
f"Long query string ({len(entry['query'])} chars)",
entry
))
# Condition 3: Large responses from script resources
if (entry['status'] == '200' and
entry['size'] > LARGE_RESPONSE_THRESHOLD and
any(entry['path'].endswith(ext) for ext in SCRIPT_EXTENSIONS)):
exfiltration_events.append((
line_num,
f"Large script response ({entry['size']} bytes)",
entry
))
# Track high-frequency resource access
ip_resource_counts[entry['ip']][entry['path']] += 1
# Condition 4: High-frequency access to same resource
for ip, resources in ip_resource_counts.items():
for path, count in resources.items():
if count > 50: # Threshold for chunked exfiltration
exfiltration_events.append((
-1,
f"High-frequency access ({count} requests) to {path}",
{'ip': ip, 'path': path}
))
return exfiltration_events
def main():
if len(sys.argv) != 2:
print("Usage: python exfiltration_scanner.py <access.log>")
sys.exit(1)
log_file = sys.argv[1]
events = analyze_log(log_file)
if not events:
print("No data exfiltration indicators found")
return
print(f"Found {len(events)} potential exfiltration indicators:")
for i, (line_num, reason, entry) in enumerate(events, 1):
print(f"\n[{i}] Line {line_num if line_num > 0 else 'N/A'} | {reason}")
print(f" IP: {entry.get('ip', 'N/A')}")
print(f" Path: {entry.get('path', 'N/A')}")
if 'query' in entry and entry['query']:
print(f" Query length: {len(entry['query'])}")
if 'size' in entry:
print(f" Response size: {entry['size']} bytes")
if 'status' in entry:
print(f" Status: {entry['status']}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment