Created
November 25, 2025 17:36
-
-
Save DJStompZone/2e96a35c059551722cc3430471ae7747 to your computer and use it in GitHub Desktop.
SBND - Simple Bespoke Network Diagnostics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| SBND - Simple Bespoke Network Diagnostics | |
| ICMP reachability and TCP port behavior analysis tool | |
| Designed to help distinguish between | |
| - Host down | |
| - Host reachable but port closed | |
| - Traffic filtered | |
| - Port-specific blocking | |
| - Broader filtering | |
| Author: DJ Stomp | |
| License: MIT | |
| """ | |
| import argparse | |
| import errno | |
| import os | |
| import select | |
| import socket | |
| import struct | |
| import time | |
| from typing import Optional, Sequence, Tuple | |
| def _checksum(data: bytes) -> int: | |
| """Compute the ICMP checksum.""" | |
| if len(data) % 2: | |
| data += b"\x00" | |
| s = 0 | |
| for i in range(0, len(data), 2): | |
| w = data[i] << 8 | data[i + 1] | |
| s = (s + w) & 0xFFFFFFFF | |
| s = (s >> 16) + (s & 0xFFFF) | |
| s += s >> 16 | |
| return (~s) & 0xFFFF | |
| def icmp_ping(host: str, count: int = 4, timeout: float = 1.0) -> Optional[float]: | |
| """Send ICMP echo requests and return average RTT in ms, or None on failure. | |
| [!] Requires raw socket privileges. | |
| If raw socket creation fails, returns None to indicate ping could not be performed. | |
| """ | |
| try: | |
| dest_addr = socket.gethostbyname(host) | |
| except OSError: | |
| return None | |
| try: | |
| icmp_proto = socket.getprotobyname("icmp") | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp_proto) | |
| except PermissionError: | |
| # No raw socket access | |
| print("Oopsiepoopsie. Try again woth sudo?") | |
| return None | |
| except OSError: | |
| return None | |
| sock.settimeout(timeout) | |
| pid = os.getpid() & 0xFFFF | |
| rtts: list[float] = [] | |
| for seq in range(1, count + 1): | |
| # ICMP header: type (8), code (0), checksum (0), id, seq | |
| header = struct.pack("!BBHHH", 8, 0, 0, pid, seq) | |
| payload = struct.pack("!d", time.time()) + b"DJSTOMP" | |
| chk = _checksum(header + payload) | |
| header = struct.pack("!BBHHH", 8, 0, chk, pid, seq) | |
| packet = header + payload | |
| send_time = time.time() | |
| try: | |
| sock.sendto(packet, (dest_addr, 1)) | |
| except OSError: | |
| continue | |
| try: | |
| while True: | |
| start_select = time.time() | |
| ready, _, _ = select.select([sock], [], [], timeout) | |
| if not ready: | |
| # Timed out | |
| break | |
| recv_time = time.time() | |
| recv_packet, addr = sock.recvfrom(1024) | |
| if addr[0] != dest_addr: | |
| # Some other ICMP noise, ignore | |
| continue | |
| icmp_header = recv_packet[20:28] | |
| icmp_type, code, _, recv_id, recv_seq = struct.unpack("!BBHHH", icmp_header) | |
| if icmp_type == 0 and recv_id == pid and recv_seq == seq: | |
| rtt_ms = (recv_time - send_time) * 1000.0 | |
| rtts.append(rtt_ms) | |
| break | |
| # Twiddle thumbs until timeout | |
| if (time.time() - start_select) > timeout: | |
| break | |
| except socket.timeout: | |
| continue | |
| except OSError: | |
| continue | |
| sock.close() | |
| if not rtts: | |
| return None | |
| return sum(rtts) / len(rtts) | |
| def tcp_connect_test(host: str, port: int, timeout: float = 5.0) -> Tuple[str, float]: | |
| """Attempt a TCP connection and classify the result. | |
| Returns: | |
| (status, elapsed_ms) | |
| Status is one of: | |
| - "open" : connection succeeded | |
| - "refused" : actively refused (ECONNREFUSED) | |
| - "unreachable" : network/host unreachable (ENETUNREACH/EHOSTUNREACH) | |
| - "timeout" : connect timed out (likely filtered/blocked) | |
| - "error:<str>" : other socket error description | |
| """ | |
| addr_info = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM) | |
| af, socktype, proto, _, sa = addr_info[0] | |
| sock = socket.socket(af, socktype, proto) | |
| sock.settimeout(timeout) | |
| start = time.time() | |
| try: | |
| # Use connect_ex to get errno | |
| err = sock.connect_ex(sa) | |
| elapsed_ms = (time.time() - start) * 1000.0 | |
| finally: | |
| sock.close() | |
| if err == 0: | |
| return "open", elapsed_ms | |
| if err == errno.ECONNREFUSED: | |
| return "refused", elapsed_ms | |
| if err in (errno.EHOSTUNREACH, errno.ENETUNREACH): | |
| return "unreachable", elapsed_ms | |
| if err in (errno.ETIMEDOUT, errno.EINPROGRESS): | |
| # EINPROGRESS with timeout ≈ no response | |
| return "timeout", elapsed_ms | |
| return f"error:{os.strerror(err)}", elapsed_ms | |
| def multi_port_probe(host: str, ports: Sequence[int], timeout: float = 5.0) -> list[Tuple[int, str, float]]: | |
| """Probe multiple TCP ports and return list of (port, status, elapsed_ms).""" | |
| results: list[Tuple[int, str, float]] = [] | |
| for port in ports: | |
| status, elapsed = tcp_connect_test(host, port, timeout) | |
| results.append((port, status, elapsed)) | |
| return results | |
| def pretty_print_results( | |
| target: str, | |
| target_port: int, | |
| ping_rtt: Optional[float], | |
| target_status: Tuple[str, float], | |
| extra_ports: Optional[list[Tuple[int, str, float]]] = None, | |
| compare: Optional[Tuple[str, int, Tuple[str, float]]] = None, | |
| ) -> None: | |
| """Print diagnostic results with a bit of flourish, because why not""" | |
| print( | |
| "\nSBND - Simple Bespoke Network Diagnostics", | |
| " ICMP reachability and TCP port behavior analysis tool", | |
| f"\n=== Network diagnostics for {target}:{target_port} ===", | |
| sep="\n" | |
| ) | |
| status, elapsed = target_status | |
| print( | |
| "\n=== ICMP ===\n", | |
| f"Ping {target}:", | |
| "unavailable" if ping_rtt is None else | |
| f"host reachable, avg RTT = {ping_rtt:.2f} ms", | |
| "\n\n=== TCP ===\n", | |
| f"{target}:{target_port} ->", | |
| f"{status} (attempt took {elapsed:.1f} ms)" | |
| ) | |
| _tcptest = "" | |
| match status: | |
| case "open": | |
| _tcptest = "→ Port is reachable and accepting connections." | |
| case "refused": | |
| _tcptest = "→ Host is reachable; port is closed or service not listening." | |
| case "unreachable": | |
| _tcptest = "→ Network/host unreachable from this client. Likely indicates a routing issue." | |
| case "timeout": | |
| _tcptest = "→ No response: traffic likely filtered or dropped. Could indicate ISP or modem issue." | |
| case s if s.startswith("error:"): | |
| _tcptest = f"→ Socket error: {s.split(':', 1)[1]}." | |
| case _: | |
| _tcptest = "Unexpected result/output for TCP test:", status | |
| print("\n", _tcptest) | |
| if extra_ports: | |
| print("\nAdditional port probes on target:") | |
| for port, st, el in extra_ports: | |
| print(f" - {target}:{port:<5} -> {st:<10} ({el:.1f} ms)") | |
| if compare: | |
| cmp_host, cmp_port, (c_status, c_elapsed) = compare | |
| print(f"\nComparison vs known-good host {cmp_host}:{cmp_port}:") | |
| print(f" TCP {cmp_host}:{cmp_port} -> {c_status} ({c_elapsed:.1f} ms)") | |
| if c_status == "open": | |
| print(" → Local network stack and outbound connectivity look fine.") | |
| elif c_status in ("timeout", "unreachable"): | |
| print(" → Your local path to the internet may be broken, not just the target.") | |
| else: | |
| print(" → Comparison result is inconclusive but useful as context.") | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="SBND (Simple Bespoke Network Diagnostics) - ICMP reachability and TCP port behavior analysis tool" | |
| ) | |
| parser.add_argument("host", help="Target IP or hostname to test") | |
| parser.add_argument("port", type=int, help="Primary TCP port to test") | |
| parser.add_argument( | |
| "--ports", | |
| type=int, | |
| nargs="*", | |
| default=[], | |
| help="Additional TCP ports to probe on the same host (e.g. 22 80 443).", | |
| ) | |
| parser.add_argument( | |
| "--timeout", | |
| type=float, | |
| default=5.0, | |
| help="TCP connect timeout in seconds (default: 5.0).", | |
| ) | |
| parser.add_argument( | |
| "--ping-count", | |
| type=int, | |
| default=4, | |
| help="Number of ICMP echo requests to send (default: 4).", | |
| ) | |
| parser.add_argument( | |
| "--compare-host", | |
| help="Optional known-good host to compare against (e.g. 1.1.1.1).", | |
| ) | |
| parser.add_argument( | |
| "--compare-port", | |
| type=int, | |
| default=443, | |
| help="Port on compare-host to test (default: 443).", | |
| ) | |
| args = parser.parse_args() | |
| ping_rtt = icmp_ping(args.host, count=args.ping_count) | |
| target_status = tcp_connect_test(args.host, args.port, args.timeout) | |
| extra_ports = None | |
| if args.ports: | |
| extra_ports = multi_port_probe(args.host, args.ports, args.timeout) | |
| compare_result = None | |
| if args.compare_host: | |
| cmp_status = tcp_connect_test(args.compare_host, args.compare_port, args.timeout) | |
| compare_result = (args.compare_host, args.compare_port, cmp_status) | |
| pretty_print_results( | |
| target=args.host, | |
| target_port=args.port, | |
| ping_rtt=ping_rtt, | |
| target_status=target_status, | |
| extra_ports=extra_ports, | |
| compare=compare_result, | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment