Created
August 6, 2025 14:04
-
-
Save h4sh5/ba54be1e8809d14fdcf8fe3a6af4aa0f to your computer and use it in GitHub Desktop.
Totally not generated by Qwen3-235B (tested)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import re | |
| import sys | |
| from collections import defaultdict | |
| # prompt: write a python script that would take a web server log file access.log and scan it for evidence of data exfiltration after a vulnerability has been exploited to create a web shell | |
| # Configuration thresholds | |
| LONG_QUERY_THRESHOLD = 1000 # Characters | |
| LARGE_RESPONSE_THRESHOLD = 100000 # Bytes (100KB) | |
| SUSPICIOUS_PATHS = [ | |
| 'shell.php', 'cmd.php', 'x.php', 'backdoor.php', 'upload.php', | |
| 'wp-admin.php', 'db.php', 'sql.php', 'r57.php', 'c99.php' | |
| ] | |
| SCRIPT_EXTENSIONS = ['.php', '.asp', '.aspx', '.jsp', '.cfm', '.pl', '.py'] | |
| def parse_log_line(line): | |
| """Parse Apache combined log format line""" | |
| pattern = r'^(\S+) \S+ \S+ \[([^\]]+)\] "([^"]*)" (\d{3}) (\S+)' | |
| match = re.match(pattern, line) | |
| if not match: | |
| return None | |
| ip, timestamp, request, status, size = match.groups() | |
| size = 0 if size == '-' else int(size) | |
| # Parse request string | |
| req_parts = request.split(maxsplit=2) | |
| if len(req_parts) < 3: | |
| return None | |
| method, path_query, _ = req_parts | |
| path, query = (path_query.split('?', 1) + [''])[:2] | |
| return { | |
| 'ip': ip, | |
| 'timestamp': timestamp, | |
| 'method': method, | |
| 'path': path, | |
| 'query': query, | |
| 'status': status, | |
| 'size': size | |
| } | |
| def analyze_log(log_file): | |
| """Scan log file for exfiltration indicators""" | |
| exfiltration_events = [] | |
| ip_resource_counts = defaultdict(lambda: defaultdict(int)) | |
| with open(log_file, 'r') as f: | |
| for line_num, line in enumerate(f, 1): | |
| entry = parse_log_line(line) | |
| if not entry: | |
| continue | |
| # Condition 1: Suspicious web shell paths | |
| if any(entry['path'].endswith(susp) for susp in SUSPICIOUS_PATHS): | |
| exfiltration_events.append(( | |
| line_num, | |
| f"Suspicious path: {entry['path']}", | |
| entry | |
| )) | |
| # Condition 2: Abnormally long query strings (GET exfiltration) | |
| if entry['method'] == 'GET' and len(entry['query']) > LONG_QUERY_THRESHOLD: | |
| exfiltration_events.append(( | |
| line_num, | |
| f"Long query string ({len(entry['query'])} chars)", | |
| entry | |
| )) | |
| # Condition 3: Large responses from script resources | |
| if (entry['status'] == '200' and | |
| entry['size'] > LARGE_RESPONSE_THRESHOLD and | |
| any(entry['path'].endswith(ext) for ext in SCRIPT_EXTENSIONS)): | |
| exfiltration_events.append(( | |
| line_num, | |
| f"Large script response ({entry['size']} bytes)", | |
| entry | |
| )) | |
| # Track high-frequency resource access | |
| ip_resource_counts[entry['ip']][entry['path']] += 1 | |
| # Condition 4: High-frequency access to same resource | |
| for ip, resources in ip_resource_counts.items(): | |
| for path, count in resources.items(): | |
| if count > 50: # Threshold for chunked exfiltration | |
| exfiltration_events.append(( | |
| -1, | |
| f"High-frequency access ({count} requests) to {path}", | |
| {'ip': ip, 'path': path} | |
| )) | |
| return exfiltration_events | |
| def main(): | |
| if len(sys.argv) != 2: | |
| print("Usage: python exfiltration_scanner.py <access.log>") | |
| sys.exit(1) | |
| log_file = sys.argv[1] | |
| events = analyze_log(log_file) | |
| if not events: | |
| print("No data exfiltration indicators found") | |
| return | |
| print(f"Found {len(events)} potential exfiltration indicators:") | |
| for i, (line_num, reason, entry) in enumerate(events, 1): | |
| print(f"\n[{i}] Line {line_num if line_num > 0 else 'N/A'} | {reason}") | |
| print(f" IP: {entry.get('ip', 'N/A')}") | |
| print(f" Path: {entry.get('path', 'N/A')}") | |
| if 'query' in entry and entry['query']: | |
| print(f" Query length: {len(entry['query'])}") | |
| if 'size' in entry: | |
| print(f" Response size: {entry['size']} bytes") | |
| if 'status' in entry: | |
| print(f" Status: {entry['status']}") | |
| if __name__ == "__main__": | |
| main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment