Created
January 21, 2026 04:47
-
-
Save nanpuyue/3c5999c30a4e1d03b3709fb9745ae12e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import socket | |
| import struct | |
| import argparse | |
| from typing import Tuple, List | |
| class LocalResolver: | |
| """本地 DNS 解析器, 显式区分 IPv4 / IPv6""" | |
| @staticmethod | |
| def resolve_ipv4(domain: str) -> str: | |
| return socket.getaddrinfo(domain, None, socket.AF_INET)[0][4][0] | |
| @staticmethod | |
| def resolve_ipv6(domain: str) -> str: | |
| return socket.getaddrinfo(domain, None, socket.AF_INET6)[0][4][0] | |
| class Socks5Client: | |
| """SOCKS5 TCP 控制连接""" | |
| def __init__(self, host: str, port: int): | |
| self.host = host | |
| self.port = port | |
| self.sock: socket.socket | None = None | |
| def connect(self) -> None: | |
| self.sock = socket.create_connection((self.host, self.port)) | |
| self._handshake() | |
| def _handshake(self) -> None: | |
| self.sock.sendall(b"\x05\x01\x00") | |
| resp = self._recv_exact(2) | |
| if resp != b"\x05\x00": | |
| raise RuntimeError("SOCKS5 handshake failed") | |
| def connect_target(self, atyp: str, addr: str, port: int) -> socket.socket: | |
| req = self._build_connect_request(atyp, addr, port) | |
| print(">>> TCP SOCKS5 CONNECT") | |
| print(f" ATYP={atyp:6} DST={self.format_dst(addr, port, atyp)}") | |
| self.sock.sendall(req) | |
| rep, bnd_atyp, bnd_addr, bnd_port = self._recv_connect_reply() | |
| if rep != 0x00: | |
| raise RuntimeError(f"CONNECT failed, REP={rep}") | |
| print("<<< TCP SOCKS5 CONNECT") | |
| print(f" REP=SUCCESS ATYP={bnd_atyp} BND={self.format_dst(bnd_addr, bnd_port, bnd_atyp)}") | |
| return self.sock | |
| def _recv_connect_reply(self) -> Tuple[int, str, str, int]: | |
| header = self._recv_exact(4) | |
| rep = header[1] | |
| atyp = header[3] | |
| if atyp == 1: | |
| addr = socket.inet_ntop(socket.AF_INET, self._recv_exact(4)) | |
| atyp_name = "IPV4" | |
| elif atyp == 4: | |
| addr = socket.inet_ntop(socket.AF_INET6, self._recv_exact(16)) | |
| atyp_name = "IPV6" | |
| elif atyp == 3: | |
| ln = self._recv_exact(1)[0] | |
| addr = self._recv_exact(ln).decode() | |
| atyp_name = "DOMAIN" | |
| else: | |
| raise RuntimeError("Unknown ATYP in reply") | |
| port = struct.unpack("!H", self._recv_exact(2))[0] | |
| return rep, atyp_name, addr, port | |
| @staticmethod | |
| def _build_connect_request(atyp: str, addr: str, port: int) -> bytes: | |
| if atyp == "IPV4": | |
| return b"\x05\x01\x00\x01" + socket.inet_pton(socket.AF_INET, addr) + struct.pack("!H", port) | |
| if atyp == "IPV6": | |
| return b"\x05\x01\x00\x04" + socket.inet_pton(socket.AF_INET6, addr) + struct.pack("!H", port) | |
| if atyp == "DOMAIN": | |
| return b"\x05\x01\x00\x03" + bytes([len(addr)]) + addr.encode() + struct.pack("!H", port) | |
| raise ValueError("Invalid ATYP") | |
| @staticmethod | |
| def format_dst(addr: str, port: int, atyp: str) -> str: | |
| if atyp == "IPV6": | |
| return f"[{addr}]:{port}" | |
| return f"{addr}:{port}" | |
| def _recv_exact(self, n: int) -> bytes: | |
| buf = b"" | |
| while len(buf) < n: | |
| data = self.sock.recv(n - len(buf)) | |
| if not data: | |
| raise RuntimeError("Unexpected EOF") | |
| buf += data | |
| return buf | |
| def close(self) -> None: | |
| if self.sock: | |
| self.sock.close() | |
| class TcpTester: | |
| """TCP CONNECT 行为测试""" | |
| def __init__(self, socks_host: str, socks_port: int): | |
| self.socks_host = socks_host | |
| self.socks_port = socks_port | |
| def run(self, domain: str, port: int) -> None: | |
| tests = [ | |
| ("IPV4", LocalResolver.resolve_ipv4(domain)), | |
| ("IPV6", LocalResolver.resolve_ipv6(domain)), | |
| ("DOMAIN", domain), | |
| ] | |
| for atyp, target in tests: | |
| self.run_single(atyp, target, domain, port) | |
| def run_single(self, atyp: str, target: str, host_header: str, port: int) -> None: | |
| print(f"\n=== TCP connect via {target} (ATYP={atyp}) ===") | |
| client = Socks5Client(self.socks_host, self.socks_port) | |
| client.connect() | |
| sock = client.connect_target(atyp, target, port) | |
| self.send_http(sock, host_header) | |
| client.close() | |
| @staticmethod | |
| def send_http(sock: socket.socket, host: str) -> None: | |
| req = ( | |
| f"GET / HTTP/1.1\r\n" | |
| f"Host: {host}\r\n" | |
| f"User-Agent: socks5-test\r\n" | |
| f"Connection: close\r\n\r\n" | |
| ).encode() | |
| print(">>> HTTP request") | |
| sock.sendall(req) | |
| resp = b"" | |
| while b"\r\n\r\n" not in resp: | |
| data = sock.recv(4096) | |
| if not data: | |
| break | |
| resp += data | |
| header = resp.split(b"\r\n\r\n", 1)[0].decode(errors="replace") | |
| print("<<< HTTP response header") | |
| for line in header.splitlines(): | |
| print(f" {line}") | |
| class ProgramOptions: | |
| def __init__(self): | |
| self.parser = argparse.ArgumentParser(description="SOCKS5 TCP CONNECT tester") | |
| self.parser.add_argument("-x", required=True, metavar="HOST:PORT") | |
| def parse(self) -> Tuple[str, int]: | |
| args = self.parser.parse_args() | |
| host, port = args.x.rsplit(":", 1) | |
| return host, int(port) | |
| if __name__ == "__main__": | |
| opts = ProgramOptions() | |
| socks_host, socks_port = opts.parse() | |
| print(f"SOCKS5 server: {socks_host}:{socks_port}") | |
| tester = TcpTester(socks_host, socks_port) | |
| tester.run("bing.com", 80) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment