Skip to content

Instantly share code, notes, and snippets.

@Hamid-K
Last active August 29, 2025 18:58
Show Gist options
  • Select an option

  • Save Hamid-K/abba3b3ee0ca8dc13404ed3a112cb499 to your computer and use it in GitHub Desktop.

Select an option

Save Hamid-K/abba3b3ee0ca8dc13404ed3a112cb499 to your computer and use it in GitHub Desktop.
update: rewrite
#!/usr/bin/env python3
"""
Tor Identity Manager - A tool to manage Tor identities and exit nodes.
This script allows you to renew your Tor identity and optionally set the exit node
country. It provides enhanced error handling, configuration options, and feedback.
hamid@darkcell.se
"""
import requests
import argparse
import time
import logging
import socket
import sys
import os
from stem import Signal, SocketError
from stem.control import Controller
from stem.connection import MissingPassword, AuthenticationFailure
from contextlib import contextmanager
# Try to import whois or alternative libraries
try:
import whois
WHOIS_AVAILABLE = True
except ImportError:
try:
import geoip2.database
import geoip2.errors
GEOIP2_AVAILABLE = True
WHOIS_AVAILABLE = False
except ImportError:
WHOIS_AVAILABLE = False
GEOIP2_AVAILABLE = False
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('tor_identity_manager')
# Default configuration
DEFAULT_CONFIG = {
'tor_control_port': 9052, # Default is 9051
'tor_socks_port': 9050,
'max_retries': 3,
'retry_delay': 5,
'timeout': 30,
'verify_ssl': True,
'geoip_db_path': '/usr/share/GeoIP/GeoLite2-Country.mmdb'
}
@contextmanager
def tor_controller_session(port=DEFAULT_CONFIG['tor_control_port'], password=None):
"""
Context manager for handling Tor controller connections with proper error handling.
Args:
port (int): Tor control port
password (str, optional): Password for authentication
Yields:
Controller: Authenticated Tor controller
Raises:
Various exceptions from stem for connection and authentication problems
"""
controller = None
try:
controller = Controller.from_port(port=port)
if password:
controller.authenticate(password=password)
else:
try:
controller.authenticate() # Try cookie authentication
except (MissingPassword, AuthenticationFailure):
logger.error("Authentication failed. You may need to provide a password.")
raise
logger.debug("Successfully connected to Tor controller")
yield controller
except SocketError as e:
logger.error(f"Could not connect to Tor control port at 127.0.0.1:{port}")
logger.error(f"Is Tor running with ControlPort enabled? Error: {e}")
raise
except Exception as e:
logger.error(f"Error connecting to Tor controller: {e}")
raise
finally:
if controller and controller.is_alive():
controller.close()
logger.debug("Tor controller connection closed")
def renew_identity(country_code=None, port=DEFAULT_CONFIG['tor_control_port'], password=None,
retries=DEFAULT_CONFIG['max_retries']):
"""
Renew the Tor identity and optionally set an exit node country.
Args:
country_code (str, optional): Two-letter country code for exit node
port (int): Tor control port
password (str, optional): Password for authentication
retries (int): Maximum number of retries
Returns:
bool: True if successful, False otherwise
"""
attempt = 0
while attempt < retries:
try:
with tor_controller_session(port=port, password=password) as controller:
# Check if Tor is actually running with the necessary feature enabled
if not controller.get_info("status/circuit-established") == "1":
logger.warning("Tor doesn't appear to have established circuits yet")
# Set the exit node if a country code is provided
if country_code:
country_code = country_code.upper() # Ensure the country code is uppercase
logger.info(f"Setting exit node to country: {country_code}")
try:
controller.set_conf('ExitNodes', f'{{{country_code}}}')
controller.set_conf('StrictNodes', '1')
except Exception as e:
logger.error(f"Failed to set exit node: {e}")
return False
# Signal for a new identity
logger.info("Requesting new Tor identity...")
controller.signal(Signal.NEWNYM)
# Check if we can request a new identity again (Tor limits this)
time_to_wait = controller.get_newnym_wait()
if time_to_wait > 0:
logger.info(f"Must wait {time_to_wait} seconds before requesting another new identity")
return True
except Exception as e:
logger.error(f"Attempt {attempt+1}/{retries} failed: {e}")
attempt += 1
if attempt < retries:
wait_time = DEFAULT_CONFIG['retry_delay'] * attempt
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
logger.error(f"Failed to renew Tor identity after {retries} attempts")
return False
def get_tor_ip(socks_port=DEFAULT_CONFIG['tor_socks_port'], timeout=DEFAULT_CONFIG['timeout'],
verify_ssl=DEFAULT_CONFIG['verify_ssl'], retries=DEFAULT_CONFIG['max_retries']):
"""
Get the current IP address when using Tor.
Args:
socks_port (int): Tor SOCKS proxy port
timeout (int): Request timeout in seconds
verify_ssl (bool): Whether to verify SSL certificates
retries (int): Maximum number of retries
Returns:
str: IP address or None if failed
"""
# Use multiple IP checking services for redundancy
ip_services = [
'https://api.ipify.org',
'https://api64.ipify.org',
'https://icanhazip.com',
'https://ident.me'
]
# Set up the Tor SOCKS proxy
proxies = {
'http': f'socks5h://127.0.0.1:{socks_port}',
'https': f'socks5h://127.0.0.1:{socks_port}'
}
session = requests.Session()
for attempt in range(retries):
for service in ip_services:
try:
logger.debug(f"Attempting to get IP from {service}")
response = session.get(
service,
proxies=proxies,
timeout=timeout,
verify=verify_ssl
)
if response.status_code == 200:
ip = response.text.strip()
if is_valid_ip(ip):
return ip
else:
logger.warning(f"Received invalid IP format: {ip}")
except requests.exceptions.RequestException as e:
logger.debug(f"Error fetching IP from {service}: {e}")
continue
# If we've tried all services without success, wait before retrying
if attempt < retries - 1:
wait_time = DEFAULT_CONFIG['retry_delay'] * (attempt + 1)
logger.info(f"Failed to get IP. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
logger.error("Failed to get Tor IP address from any service")
return None
def is_valid_ip(ip):
"""Check if a string is a valid IPv4 or IPv6 address."""
try:
socket.inet_pton(socket.AF_INET, ip)
return True
except socket.error:
try:
socket.inet_pton(socket.AF_INET6, ip)
return True
except socket.error:
return False
def get_ip_country(ip):
"""
Get the country for an IP address using either whois or GeoIP2.
Args:
ip (str): IP address to look up
Returns:
tuple: (country_code, country_name) or (None, 'Unknown')
"""
if not ip or not is_valid_ip(ip):
return None, 'Unknown'
# Try using python-whois if available
if WHOIS_AVAILABLE:
try:
ip_info = whois.whois(ip)
country = ip_info.country
if isinstance(country, list) and country:
country = country[0]
return country, country
except Exception as e:
logger.debug(f"WHOIS lookup failed: {e}")
# Try using GeoIP2 if available
if GEOIP2_AVAILABLE:
try:
with geoip2.database.Reader(DEFAULT_CONFIG['geoip_db_path']) as reader:
response = reader.country(ip)
return response.country.iso_code, response.country.name
except (geoip2.errors.AddressNotFoundError, FileNotFoundError) as e:
logger.debug(f"GeoIP2 lookup failed: {e}")
# Fallback to online service if local lookups failed
try:
response = requests.get(f"https://ipinfo.io/{ip}/json", timeout=DEFAULT_CONFIG['timeout'])
if response.status_code == 200:
data = response.json()
return data.get('country'), data.get('country')
except Exception as e:
logger.debug(f"Online IP lookup failed: {e}")
return None, 'Unknown'
def test_tor_connection(socks_port=DEFAULT_CONFIG['tor_socks_port']):
"""
Test if the Tor connection is working properly.
Args:
socks_port (int): Tor SOCKS proxy port
Returns:
bool: True if connection is working, False otherwise
"""
try:
ip = get_tor_ip(socks_port=socks_port)
if not ip:
return False
# Check if we're actually using Tor
proxies = {
'http': f'socks5h://127.0.0.1:{socks_port}',
'https': f'socks5h://127.0.0.1:{socks_port}'
}
# Try to connect to the Tor check service
response = requests.get(
'https://check.torproject.org/api/ip',
proxies=proxies,
timeout=DEFAULT_CONFIG['timeout'],
verify=DEFAULT_CONFIG['verify_ssl']
)
if response.status_code == 200:
data = response.json()
return data.get('IsTor', False)
return False
except Exception as e:
logger.error(f"Error testing Tor connection: {e}")
return False
def main():
"""Main function to parse arguments and execute the program."""
parser = argparse.ArgumentParser(
description='Tor Identity Manager - Renew Tor identity and manage exit nodes',
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('-c', '--country', type=str, help='Two-letter country code for exit node (e.g., DE for Germany)')
parser.add_argument('-p', '--control-port', type=int, default=DEFAULT_CONFIG['tor_control_port'],
help='Tor control port')
parser.add_argument('-s', '--socks-port', type=int, default=DEFAULT_CONFIG['tor_socks_port'],
help='Tor SOCKS proxy port')
parser.add_argument('--password', type=str, help='Password for Tor control port authentication')
parser.add_argument('-r', '--retries', type=int, default=DEFAULT_CONFIG['max_retries'],
help='Maximum number of retries for operations')
parser.add_argument('-t', '--timeout', type=int, default=DEFAULT_CONFIG['timeout'],
help='Timeout for network operations in seconds')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('--no-verify-ssl', action='store_true', help='Disable SSL certificate verification')
parser.add_argument('--test', action='store_true', help='Test Tor connection and exit')
args = parser.parse_args()
# Configure logging based on verbosity
if args.verbose:
logger.setLevel(logging.DEBUG)
# Update configuration based on arguments
config = DEFAULT_CONFIG.copy()
config['tor_control_port'] = args.control_port
config['tor_socks_port'] = args.socks_port
config['max_retries'] = args.retries
config['timeout'] = args.timeout
config['verify_ssl'] = not args.no_verify_ssl
# If just testing, do that and exit
if args.test:
logger.info("Testing Tor connection...")
if test_tor_connection(socks_port=config['tor_socks_port']):
logger.info("✓ Tor is working correctly!")
return 0
else:
logger.error("✗ Tor connection test failed")
return 1
# First, check if Tor is running
logger.info("Checking Tor connection...")
if not test_tor_connection(socks_port=config['tor_socks_port']):
logger.error("Tor doesn't appear to be working correctly. Please check your Tor installation.")
return 1
# Get the old IP before renewing the identity
logger.info("Getting current Tor IP...")
old_ip = get_tor_ip(socks_port=config['tor_socks_port'],
timeout=config['timeout'],
verify_ssl=config['verify_ssl'],
retries=config['max_retries'])
if old_ip:
old_country_code, old_country_name = get_ip_country(old_ip)
if old_country_code:
logger.info(f"Current Tor IP: {old_ip} (Country: {old_country_name} [{old_country_code}])")
else:
logger.info(f"Current Tor IP: {old_ip} (Country: Unknown)")
else:
logger.warning("Could not determine current Tor IP")
# Renew the identity with optional country code
logger.info("Renewing Tor identity...")
if renew_identity(
country_code=args.country,
port=config['tor_control_port'],
password=args.password,
retries=config['max_retries']
):
logger.info("✓ Tor identity renewal requested successfully")
# Give Tor a moment to establish the new circuits
logger.info("Waiting for new circuits to be established...")
time.sleep(5)
# Get the new IP after renewing the identity
logger.info("Getting new Tor IP...")
new_ip = get_tor_ip(socks_port=config['tor_socks_port'],
timeout=config['timeout'],
verify_ssl=config['verify_ssl'],
retries=config['max_retries'])
if new_ip:
new_country_code, new_country_name = get_ip_country(new_ip)
# Check if the IP actually changed
if old_ip and new_ip == old_ip:
logger.warning("IP address did not change. This can happen if Tor reuses circuits.")
else:
logger.info(f"New Tor IP: {new_ip} (Country: {new_country_name} [{new_country_code}])")
# Verify the country of the new IP if a country code was provided
if args.country and new_country_code:
if new_country_code.upper() == args.country.upper():
logger.info(f"✓ Success: New IP is from the specified country: {new_country_name}")
else:
logger.warning(f"✗ Warning: New IP is from {new_country_name}, not the specified country: {args.country}")
logger.info("This can happen if Tor cannot establish a circuit through the specified country.")
else:
logger.error("Could not determine new Tor IP")
return 1
else:
logger.error("Failed to renew Tor identity")
return 1
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
logger.info("Operation cancelled by user")
sys.exit(130)
except Exception as e:
logger.critical(f"Unhandled error: {e}", exc_info=True)
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment