Last active
August 29, 2025 18:58
-
-
Save Hamid-K/abba3b3ee0ca8dc13404ed3a112cb499 to your computer and use it in GitHub Desktop.
update: rewrite
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Tor Identity Manager - A tool to manage Tor identities and exit nodes. | |
| This script allows you to renew your Tor identity and optionally set the exit node | |
| country. It provides enhanced error handling, configuration options, and feedback. | |
| hamid@darkcell.se | |
| """ | |
| import requests | |
| import argparse | |
| import time | |
| import logging | |
| import socket | |
| import sys | |
| import os | |
| from stem import Signal, SocketError | |
| from stem.control import Controller | |
| from stem.connection import MissingPassword, AuthenticationFailure | |
| from contextlib import contextmanager | |
| # Try to import whois or alternative libraries | |
| try: | |
| import whois | |
| WHOIS_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| import geoip2.database | |
| import geoip2.errors | |
| GEOIP2_AVAILABLE = True | |
| WHOIS_AVAILABLE = False | |
| except ImportError: | |
| WHOIS_AVAILABLE = False | |
| GEOIP2_AVAILABLE = False | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger('tor_identity_manager') | |
| # Default configuration | |
| DEFAULT_CONFIG = { | |
| 'tor_control_port': 9052, # Default is 9051 | |
| 'tor_socks_port': 9050, | |
| 'max_retries': 3, | |
| 'retry_delay': 5, | |
| 'timeout': 30, | |
| 'verify_ssl': True, | |
| 'geoip_db_path': '/usr/share/GeoIP/GeoLite2-Country.mmdb' | |
| } | |
| @contextmanager | |
| def tor_controller_session(port=DEFAULT_CONFIG['tor_control_port'], password=None): | |
| """ | |
| Context manager for handling Tor controller connections with proper error handling. | |
| Args: | |
| port (int): Tor control port | |
| password (str, optional): Password for authentication | |
| Yields: | |
| Controller: Authenticated Tor controller | |
| Raises: | |
| Various exceptions from stem for connection and authentication problems | |
| """ | |
| controller = None | |
| try: | |
| controller = Controller.from_port(port=port) | |
| if password: | |
| controller.authenticate(password=password) | |
| else: | |
| try: | |
| controller.authenticate() # Try cookie authentication | |
| except (MissingPassword, AuthenticationFailure): | |
| logger.error("Authentication failed. You may need to provide a password.") | |
| raise | |
| logger.debug("Successfully connected to Tor controller") | |
| yield controller | |
| except SocketError as e: | |
| logger.error(f"Could not connect to Tor control port at 127.0.0.1:{port}") | |
| logger.error(f"Is Tor running with ControlPort enabled? Error: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error connecting to Tor controller: {e}") | |
| raise | |
| finally: | |
| if controller and controller.is_alive(): | |
| controller.close() | |
| logger.debug("Tor controller connection closed") | |
| def renew_identity(country_code=None, port=DEFAULT_CONFIG['tor_control_port'], password=None, | |
| retries=DEFAULT_CONFIG['max_retries']): | |
| """ | |
| Renew the Tor identity and optionally set an exit node country. | |
| Args: | |
| country_code (str, optional): Two-letter country code for exit node | |
| port (int): Tor control port | |
| password (str, optional): Password for authentication | |
| retries (int): Maximum number of retries | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| attempt = 0 | |
| while attempt < retries: | |
| try: | |
| with tor_controller_session(port=port, password=password) as controller: | |
| # Check if Tor is actually running with the necessary feature enabled | |
| if not controller.get_info("status/circuit-established") == "1": | |
| logger.warning("Tor doesn't appear to have established circuits yet") | |
| # Set the exit node if a country code is provided | |
| if country_code: | |
| country_code = country_code.upper() # Ensure the country code is uppercase | |
| logger.info(f"Setting exit node to country: {country_code}") | |
| try: | |
| controller.set_conf('ExitNodes', f'{{{country_code}}}') | |
| controller.set_conf('StrictNodes', '1') | |
| except Exception as e: | |
| logger.error(f"Failed to set exit node: {e}") | |
| return False | |
| # Signal for a new identity | |
| logger.info("Requesting new Tor identity...") | |
| controller.signal(Signal.NEWNYM) | |
| # Check if we can request a new identity again (Tor limits this) | |
| time_to_wait = controller.get_newnym_wait() | |
| if time_to_wait > 0: | |
| logger.info(f"Must wait {time_to_wait} seconds before requesting another new identity") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt+1}/{retries} failed: {e}") | |
| attempt += 1 | |
| if attempt < retries: | |
| wait_time = DEFAULT_CONFIG['retry_delay'] * attempt | |
| logger.info(f"Retrying in {wait_time} seconds...") | |
| time.sleep(wait_time) | |
| logger.error(f"Failed to renew Tor identity after {retries} attempts") | |
| return False | |
| def get_tor_ip(socks_port=DEFAULT_CONFIG['tor_socks_port'], timeout=DEFAULT_CONFIG['timeout'], | |
| verify_ssl=DEFAULT_CONFIG['verify_ssl'], retries=DEFAULT_CONFIG['max_retries']): | |
| """ | |
| Get the current IP address when using Tor. | |
| Args: | |
| socks_port (int): Tor SOCKS proxy port | |
| timeout (int): Request timeout in seconds | |
| verify_ssl (bool): Whether to verify SSL certificates | |
| retries (int): Maximum number of retries | |
| Returns: | |
| str: IP address or None if failed | |
| """ | |
| # Use multiple IP checking services for redundancy | |
| ip_services = [ | |
| 'https://api.ipify.org', | |
| 'https://api64.ipify.org', | |
| 'https://icanhazip.com', | |
| 'https://ident.me' | |
| ] | |
| # Set up the Tor SOCKS proxy | |
| proxies = { | |
| 'http': f'socks5h://127.0.0.1:{socks_port}', | |
| 'https': f'socks5h://127.0.0.1:{socks_port}' | |
| } | |
| session = requests.Session() | |
| for attempt in range(retries): | |
| for service in ip_services: | |
| try: | |
| logger.debug(f"Attempting to get IP from {service}") | |
| response = session.get( | |
| service, | |
| proxies=proxies, | |
| timeout=timeout, | |
| verify=verify_ssl | |
| ) | |
| if response.status_code == 200: | |
| ip = response.text.strip() | |
| if is_valid_ip(ip): | |
| return ip | |
| else: | |
| logger.warning(f"Received invalid IP format: {ip}") | |
| except requests.exceptions.RequestException as e: | |
| logger.debug(f"Error fetching IP from {service}: {e}") | |
| continue | |
| # If we've tried all services without success, wait before retrying | |
| if attempt < retries - 1: | |
| wait_time = DEFAULT_CONFIG['retry_delay'] * (attempt + 1) | |
| logger.info(f"Failed to get IP. Retrying in {wait_time} seconds...") | |
| time.sleep(wait_time) | |
| logger.error("Failed to get Tor IP address from any service") | |
| return None | |
| def is_valid_ip(ip): | |
| """Check if a string is a valid IPv4 or IPv6 address.""" | |
| try: | |
| socket.inet_pton(socket.AF_INET, ip) | |
| return True | |
| except socket.error: | |
| try: | |
| socket.inet_pton(socket.AF_INET6, ip) | |
| return True | |
| except socket.error: | |
| return False | |
| def get_ip_country(ip): | |
| """ | |
| Get the country for an IP address using either whois or GeoIP2. | |
| Args: | |
| ip (str): IP address to look up | |
| Returns: | |
| tuple: (country_code, country_name) or (None, 'Unknown') | |
| """ | |
| if not ip or not is_valid_ip(ip): | |
| return None, 'Unknown' | |
| # Try using python-whois if available | |
| if WHOIS_AVAILABLE: | |
| try: | |
| ip_info = whois.whois(ip) | |
| country = ip_info.country | |
| if isinstance(country, list) and country: | |
| country = country[0] | |
| return country, country | |
| except Exception as e: | |
| logger.debug(f"WHOIS lookup failed: {e}") | |
| # Try using GeoIP2 if available | |
| if GEOIP2_AVAILABLE: | |
| try: | |
| with geoip2.database.Reader(DEFAULT_CONFIG['geoip_db_path']) as reader: | |
| response = reader.country(ip) | |
| return response.country.iso_code, response.country.name | |
| except (geoip2.errors.AddressNotFoundError, FileNotFoundError) as e: | |
| logger.debug(f"GeoIP2 lookup failed: {e}") | |
| # Fallback to online service if local lookups failed | |
| try: | |
| response = requests.get(f"https://ipinfo.io/{ip}/json", timeout=DEFAULT_CONFIG['timeout']) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data.get('country'), data.get('country') | |
| except Exception as e: | |
| logger.debug(f"Online IP lookup failed: {e}") | |
| return None, 'Unknown' | |
| def test_tor_connection(socks_port=DEFAULT_CONFIG['tor_socks_port']): | |
| """ | |
| Test if the Tor connection is working properly. | |
| Args: | |
| socks_port (int): Tor SOCKS proxy port | |
| Returns: | |
| bool: True if connection is working, False otherwise | |
| """ | |
| try: | |
| ip = get_tor_ip(socks_port=socks_port) | |
| if not ip: | |
| return False | |
| # Check if we're actually using Tor | |
| proxies = { | |
| 'http': f'socks5h://127.0.0.1:{socks_port}', | |
| 'https': f'socks5h://127.0.0.1:{socks_port}' | |
| } | |
| # Try to connect to the Tor check service | |
| response = requests.get( | |
| 'https://check.torproject.org/api/ip', | |
| proxies=proxies, | |
| timeout=DEFAULT_CONFIG['timeout'], | |
| verify=DEFAULT_CONFIG['verify_ssl'] | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data.get('IsTor', False) | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error testing Tor connection: {e}") | |
| return False | |
| def main(): | |
| """Main function to parse arguments and execute the program.""" | |
| parser = argparse.ArgumentParser( | |
| description='Tor Identity Manager - Renew Tor identity and manage exit nodes', | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter | |
| ) | |
| parser.add_argument('-c', '--country', type=str, help='Two-letter country code for exit node (e.g., DE for Germany)') | |
| parser.add_argument('-p', '--control-port', type=int, default=DEFAULT_CONFIG['tor_control_port'], | |
| help='Tor control port') | |
| parser.add_argument('-s', '--socks-port', type=int, default=DEFAULT_CONFIG['tor_socks_port'], | |
| help='Tor SOCKS proxy port') | |
| parser.add_argument('--password', type=str, help='Password for Tor control port authentication') | |
| parser.add_argument('-r', '--retries', type=int, default=DEFAULT_CONFIG['max_retries'], | |
| help='Maximum number of retries for operations') | |
| parser.add_argument('-t', '--timeout', type=int, default=DEFAULT_CONFIG['timeout'], | |
| help='Timeout for network operations in seconds') | |
| parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') | |
| parser.add_argument('--no-verify-ssl', action='store_true', help='Disable SSL certificate verification') | |
| parser.add_argument('--test', action='store_true', help='Test Tor connection and exit') | |
| args = parser.parse_args() | |
| # Configure logging based on verbosity | |
| if args.verbose: | |
| logger.setLevel(logging.DEBUG) | |
| # Update configuration based on arguments | |
| config = DEFAULT_CONFIG.copy() | |
| config['tor_control_port'] = args.control_port | |
| config['tor_socks_port'] = args.socks_port | |
| config['max_retries'] = args.retries | |
| config['timeout'] = args.timeout | |
| config['verify_ssl'] = not args.no_verify_ssl | |
| # If just testing, do that and exit | |
| if args.test: | |
| logger.info("Testing Tor connection...") | |
| if test_tor_connection(socks_port=config['tor_socks_port']): | |
| logger.info("✓ Tor is working correctly!") | |
| return 0 | |
| else: | |
| logger.error("✗ Tor connection test failed") | |
| return 1 | |
| # First, check if Tor is running | |
| logger.info("Checking Tor connection...") | |
| if not test_tor_connection(socks_port=config['tor_socks_port']): | |
| logger.error("Tor doesn't appear to be working correctly. Please check your Tor installation.") | |
| return 1 | |
| # Get the old IP before renewing the identity | |
| logger.info("Getting current Tor IP...") | |
| old_ip = get_tor_ip(socks_port=config['tor_socks_port'], | |
| timeout=config['timeout'], | |
| verify_ssl=config['verify_ssl'], | |
| retries=config['max_retries']) | |
| if old_ip: | |
| old_country_code, old_country_name = get_ip_country(old_ip) | |
| if old_country_code: | |
| logger.info(f"Current Tor IP: {old_ip} (Country: {old_country_name} [{old_country_code}])") | |
| else: | |
| logger.info(f"Current Tor IP: {old_ip} (Country: Unknown)") | |
| else: | |
| logger.warning("Could not determine current Tor IP") | |
| # Renew the identity with optional country code | |
| logger.info("Renewing Tor identity...") | |
| if renew_identity( | |
| country_code=args.country, | |
| port=config['tor_control_port'], | |
| password=args.password, | |
| retries=config['max_retries'] | |
| ): | |
| logger.info("✓ Tor identity renewal requested successfully") | |
| # Give Tor a moment to establish the new circuits | |
| logger.info("Waiting for new circuits to be established...") | |
| time.sleep(5) | |
| # Get the new IP after renewing the identity | |
| logger.info("Getting new Tor IP...") | |
| new_ip = get_tor_ip(socks_port=config['tor_socks_port'], | |
| timeout=config['timeout'], | |
| verify_ssl=config['verify_ssl'], | |
| retries=config['max_retries']) | |
| if new_ip: | |
| new_country_code, new_country_name = get_ip_country(new_ip) | |
| # Check if the IP actually changed | |
| if old_ip and new_ip == old_ip: | |
| logger.warning("IP address did not change. This can happen if Tor reuses circuits.") | |
| else: | |
| logger.info(f"New Tor IP: {new_ip} (Country: {new_country_name} [{new_country_code}])") | |
| # Verify the country of the new IP if a country code was provided | |
| if args.country and new_country_code: | |
| if new_country_code.upper() == args.country.upper(): | |
| logger.info(f"✓ Success: New IP is from the specified country: {new_country_name}") | |
| else: | |
| logger.warning(f"✗ Warning: New IP is from {new_country_name}, not the specified country: {args.country}") | |
| logger.info("This can happen if Tor cannot establish a circuit through the specified country.") | |
| else: | |
| logger.error("Could not determine new Tor IP") | |
| return 1 | |
| else: | |
| logger.error("Failed to renew Tor identity") | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| try: | |
| sys.exit(main()) | |
| except KeyboardInterrupt: | |
| logger.info("Operation cancelled by user") | |
| sys.exit(130) | |
| except Exception as e: | |
| logger.critical(f"Unhandled error: {e}", exc_info=True) | |
| sys.exit(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment