Skip to content

Instantly share code, notes, and snippets.

@zr0n
Created January 21, 2026 01:24
Show Gist options
  • Select an option

  • Save zr0n/66bc12bab4ac53d68d3e1123f51d079e to your computer and use it in GitHub Desktop.

Select an option

Save zr0n/66bc12bab4ac53d68d3e1123f51d079e to your computer and use it in GitHub Desktop.
"""
SISTEMA DE DETECÇÃO DE AMEAÇAS EM REDES - CODE CHALLENGE
Instruções:
Implemente as funções abaixo seguindo os requisitos.
Não altere os nomes das funções ou parâmetros.
Teste sua solução com os asserts fornecidos.
"""
import collections
import datetime
import ipaddress
import re
from typing import Dict, List, Tuple, Set
# =================== DADOS DE CONFIGURAÇÃO ===================
# Portas conhecidas e seus serviços
WELL_KNOWN_PORTS = {
20: "FTP-DATA", 21: "FTP", 22: "SSH", 23: "TELNET", 25: "SMTP",
53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS",
445: "SMB", 3306: "MYSQL", 3389: "RDP", 5900: "VNC", 8080: "HTTP-PROXY"
}
# Portas frequentemente usadas por malware/ataques
MALICIOUS_PORTS = {
4444: "Metasploit", 31337: "BackOrifice", 6667: "IRC-Botnet",
1337: "Leet", 12345: "NetBus", 27374: "SubSeven",
54321: "Backdoor", 65432: "Attack-Tool"
}
# Subredes internas da empresa (RFC 1918 + redes específicas)
INTERNAL_SUBNETS = [
ipaddress.ip_network('10.0.0.0/8'),
ipaddress.ip_network('172.16.0.0/12'),
ipaddress.ip_network('192.168.0.0/16'),
ipaddress.ip_network('192.168.1.0/24'), # Rede específica do escritório
]
# =================== FUNÇÕES A SEREM IMPLEMENTADAS ===================
def parse_firewall_log(log_entry: str) -> dict:
"""
Parseia uma entrada de log do firewall no formato:
"TIMESTAMP SRC_IP:SRC_PORT -> DST_IP:DST_PORT PROTOCOL ACTION"
Exemplo: "2024-01-15 14:30:25 192.168.1.100:54321 -> 10.0.0.5:80 TCP ALLOW"
Retorna um dicionário com as chaves:
- timestamp: datetime.datetime
- src_ip: str
- src_port: int
- dst_ip: str
- dst_port: int
- protocol: str
- action: str
Se o formato for inválido, retorna None.
"""
# TODO: Implementar a lógica de parsing
pass
def classify_ip_address(ip_str: str) -> str:
"""
Classifica um endereço IP em uma das categorias:
- "INTERNAL": IP interno da empresa
- "EXTERNAL": IP externo (internet)
- "RESERVED": IP reservado (multicast, loopback, etc)
- "INVALID": IP inválido
Use a lista INTERNAL_SUBNETS para determinar IPs internos.
"""
# TODO: Implementar a classificação
pass
def detect_port_scan(connections: List[dict], time_window_seconds: int = 60) -> List[Tuple[str, Dict]]:
"""
Detecta possíveis port scans baseado em:
1. Um IP tentando se conectar a múltiplas portas no mesmo destino
2. Múltiplas tentativas de conexão em pouco tempo
Parâmetros:
- connections: Lista de conexões parseadas
- time_window_seconds: Janela de tempo para análise (padrão: 60 segundos)
Retorna lista de tuplas: [(ip_suspeito, estatísticas), ...]
Onde estatísticas inclui:
- 'target_ip': IP alvo
- 'unique_ports': número de portas únicas acessadas
- 'attempts': número total de tentativas
- 'time_range': (início, fim) do ataque
"""
# TODO: Implementar detecção de port scan
pass
def calculate_threat_score(ip_stats: Dict) -> int:
"""
Calcula um score de ameaça para um IP (0-100) baseado em:
1. Número de tentativas de conexão
2. Portas acessadas (portas maliciosas valem mais pontos)
3. IPs internos vs externos
4. Variação de portas de origem
Regras de pontuação:
- +5 por cada tentativa de conexão
- +20 por cada porta maliciosa acessada
- +10 se origem for externa e destino interno
- +15 se usar múltiplas portas de origem (evasão)
- +30 se detectado port scan
"""
# TODO: Implementar cálculo de threat score
pass
def generate_incident_report(suspicious_ips: Dict[str, Dict]) -> str:
"""
Gera um relatório de incidente formatado para a equipe de SOC.
Formato:
=== INCIDENT REPORT ===
Timestamp: [DATA_HORA]
Total IPs suspeitos: X
[IP SUSPEITO 1]
Threat Score: XX/100
Classification: [HIGH/MEDIUM/LOW]
Detected Activities:
- [ATIVIDADE 1]
- [ATIVIDADE 2]
Recommended Actions:
1. [AÇÃO 1]
2. [AÇÃO 2]
[IP SUSPEITO 2]
...
"""
# TODO: Implementar geração de relatório
pass
def real_time_monitor(log_entries: List[str]) -> Dict:
"""
Função principal que orquestra o monitoramento em tempo real.
Retorna um dicionário com:
- 'total_connections': número total de conexões processadas
- 'suspicious_ips': dicionário de IPs suspeitos com seus dados
- 'incident_report': relatório formatado
- 'threat_level': 'LOW'/'MEDIUM'/'HIGH' baseado no pior score
"""
# TODO: Implementar monitor principal
pass
# =================== TESTES UNITÁRIOS ===================
def run_tests():
"""Executa todos os testes do sistema"""
print("🧪 Iniciando testes do Sistema de Detecção de Ameaças...\n")
# Teste 1: Parsing de logs
print("Teste 1: Parsing de logs de firewall")
test_log = "2024-01-15 14:30:25 192.168.1.100:54321 -> 10.0.0.5:80 TCP ALLOW"
parsed = parse_firewall_log(test_log)
assert parsed is not None, "Falha: Log válido retornou None"
assert parsed['src_ip'] == "192.168.1.100", f"Esperado: 192.168.1.100, Obtido: {parsed['src_ip']}"
assert parsed['dst_port'] == 80, f"Esperado: 80, Obtido: {parsed['dst_port']}"
assert parsed['action'] == "ALLOW", f"Esperado: ALLOW, Obtido: {parsed['action']}"
assert isinstance(parsed['timestamp'], datetime.datetime), "Timestamp deve ser datetime"
print("✅ Teste 1 passou!\n")
# Teste 2: Classificação de IPs
print("Teste 2: Classificação de endereços IP")
test_cases = [
("192.168.1.100", "INTERNAL"),
("10.5.5.5", "INTERNAL"),
("172.16.0.1", "INTERNAL"),
("8.8.8.8", "EXTERNAL"),
("224.0.0.1", "RESERVED"),
("127.0.0.1", "RESERVED"),
("300.400.500.600", "INVALID"),
("not-an-ip", "INVALID")
]
for ip, expected in test_cases:
result = classify_ip_address(ip)
assert result == expected, f"IP {ip}: Esperado {expected}, Obtido {result}"
print("✅ Teste 2 passou!\n")
# Teste 3: Dados de exemplo para detecção de port scan
print("Teste 3: Detecção de Port Scan")
# Criar dados de teste simulando um port scan
test_connections = []
base_time = datetime.datetime(2024, 1, 15, 14, 30, 0)
# IP 1: Comportamento normal (apenas portas 80, 443)
for i in range(3):
test_connections.append({
'timestamp': base_time + datetime.timedelta(seconds=i*10),
'src_ip': '192.168.1.50',
'src_port': 50000 + i,
'dst_ip': '10.0.0.1',
'dst_port': 80 if i % 2 == 0 else 443,
'protocol': 'TCP',
'action': 'ALLOW'
})
# IP 2: Port scan (múltiplas portas em pouco tempo)
for i, port in enumerate([22, 23, 25, 3389, 5900, 8080]):
test_connections.append({
'timestamp': base_time + datetime.timedelta(seconds=i*2), # 2 segundos entre tentativas
'src_ip': '203.0.113.45',
'src_port': 55000 + i,
'dst_ip': '10.0.0.5',
'dst_port': port,
'protocol': 'TCP',
'action': 'BLOCK' # Firewall bloqueando
})
scan_results = detect_port_scan(test_connections, time_window_seconds=30)
# Deve detectar apenas o IP 203.0.113.45 como scanner
assert len(scan_results) >= 1, "Deveria detectar pelo menos um port scan"
suspicious_ips = [ip for ip, _ in scan_results]
assert '203.0.113.45' in suspicious_ips, "IP do scanner não foi detectado"
assert '192.168.1.50' not in suspicious_ips, "IP normal foi erroneamente flagado"
print(f"✅ Teste 3 passou! Detectados {len(scan_results)} IPs suspeitos\n")
# Teste 4: Cálculo de Threat Score
print("Teste 4: Cálculo de Threat Score")
# Caso 1: IP com comportamento normal
normal_ip_stats = {
'ip': '192.168.1.50',
'connection_count': 5,
'unique_dst_ports': 2,
'malicious_ports_accessed': 0,
'source_ip_type': 'INTERNAL',
'destination_ip_type': 'INTERNAL',
'unique_src_ports': 3,
'port_scan_detected': False
}
# Caso 2: IP malicioso
malicious_ip_stats = {
'ip': '203.0.113.45',
'connection_count': 15,
'unique_dst_ports': 8,
'malicious_ports_accessed': 3,
'source_ip_type': 'EXTERNAL',
'destination_ip_type': 'INTERNAL',
'unique_src_ports': 12, # Muitas portas de origem diferentes
'port_scan_detected': True
}
normal_score = calculate_threat_score(normal_ip_stats)
malicious_score = calculate_threat_score(malicious_ip_stats)
assert 0 <= normal_score <= 100, f"Score normal fora do range: {normal_score}"
assert 0 <= malicious_score <= 100, f"Score malicioso fora do range: {malicious_score}"
assert malicious_score > normal_score, f"Score malicioso ({malicious_score}) deveria ser maior que normal ({normal_score})"
print(f"✅ Teste 4 passou! Score normal: {normal_score}, Score malicioso: {malicious_score}\n")
# Teste 5: Geração de relatório
print("Teste 5: Geração de Relatório de Incidente")
suspicious_ips_data = {
'203.0.113.45': {
'threat_score': malicious_score,
'activities': ['Port scanning detected', 'Multiple malicious port access'],
'connection_count': 15,
'classification': 'HIGH'
}
}
report = generate_incident_report(suspicious_ips_data)
assert isinstance(report, str), "Relatório deve ser string"
assert "INCIDENT REPORT" in report, "Relatório deve conter cabeçalho"
assert "203.0.113.45" in report, "Relatório deve mencionar IP suspeito"
assert "Recommended Actions" in report, "Relatório deve ter ações recomendadas"
print("✅ Teste 5 passou! Relatório gerado com sucesso.\n")
# Teste 6: Sistema Completo
print("Teste 6: Sistema de Monitoramento Completo")
# Logs de exemplo para teste completo
sample_logs = [
"2024-01-15 14:30:00 192.168.1.10:52000 -> 10.0.0.1:80 TCP ALLOW",
"2024-01-15 14:30:01 192.168.1.10:52001 -> 10.0.0.1:443 TCP ALLOW",
"2024-01-15 14:30:02 203.0.113.1:61000 -> 10.0.0.5:22 TCP BLOCK",
"2024-01-15 14:30:03 203.0.113.1:61001 -> 10.0.0.5:23 TCP BLOCK",
"2024-01-15 14:30:04 203.0.113.1:61002 -> 10.0.0.5:25 TCP BLOCK",
"2024-01-15 14:30:05 203.0.113.1:61003 -> 10.0.0.5:3389 TCP BLOCK",
"2024-01-15 14:30:06 203.0.113.1:61004 -> 10.0.0.5:4444 TCP BLOCK", # Porta maliciosa
"2024-01-15 14:30:07 8.8.8.8:53000 -> 10.0.0.1:53 UDP ALLOW", # DNS normal
"2024-01-15 14:30:08 192.168.1.50:54000 -> 10.0.0.10:445 TCP BLOCK", # SMB interno
]
monitoring_results = real_time_monitor(sample_logs)
assert 'total_connections' in monitoring_results
assert 'suspicious_ips' in monitoring_results
assert 'incident_report' in monitoring_results
assert 'threat_level' in monitoring_results
print(f"✅ Teste 6 passou!")
print(f" Conexões processadas: {monitoring_results['total_connections']}")
print(f" IPs suspeitos encontrados: {len(monitoring_results['suspicious_ips'])}")
print(f" Nível de ameaça: {monitoring_results['threat_level']}")
print("\n" + "="*50)
print("🎉 TODOS OS TESTES PASSARAM! SISTEMA OPERACIONAL.")
print("="*50)
return True
# =================== EXECUÇÃO PRINCIPAL ===================
if __name__ == "__main__":
try:
run_tests()
except AssertionError as e:
print(f"\n❌ TESTE FALHOU: {e}")
print("\n💡 Dica: Verifique sua implementação e tente novamente.")
except Exception as e:
print(f"\n⚠️ ERRO INESPERADO: {e}")
import traceback
traceback.print_exc()
# =================== EXERCÍCIO BÔNUS ===================
"""
EXERCÍCIO BÔNUS (Opcional, para desafio extra):
Implemente as seguintes funcionalidades extras:
1. **DETECÇÃO DE DDoS:**
- Detecte quando múltiplos IPs externos atacam o mesmo destino
- Considere a taxa de conexões por segundo
2. **ANÁLISE DE COMPORTAMENTO:**
- Aprenda padrões normais de cada IP interno
- Detecte desvios do comportamento normal
3. **INTEGRAÇÃO COM API DE THREAT INTELLIGENCE:**
- Consulte serviços como AbuseIPDB ou VirusTotal
- Use cache para evitar consultas repetidas
4. **SISTEMA DE ALERTAS EM TEMPO REAL:**
- Envie alertas via email/Slack quando ameaça HIGH for detectada
- Inclua informações contextuais no alerta
5. **ANÁLISE DE PERSISTÊNCIA:**
- Rastreie IPs que aparecem periodicamente
- Detecte tentativas de evasão (mudança de portas, IPs)
Implemente pelo menos 2 dessas funcionalidades para ganhar "pontos bônus"!
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment