Last active
February 24, 2026 16:42
-
-
Save ankurpandeyvns/7df0bf41841d96afc82fe3ff97037fb5 to your computer and use it in GitHub Desktop.
Excitel ZTE ZXIC 2K05X ONT in bridge mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Excitel ONT Bridge Mode Auto-Setup | |
| =================================== | |
| Automates setting up (or reverting) the ZTE ZXIC 2K05X ONT in bridge mode | |
| so a downstream router (e.g. UniFi Dream Machine) can handle PPPoE directly. | |
| Architecture Overview | |
| --------------------- | |
| In factory (route) mode the ONT runs its own PPPoE client and NATs traffic | |
| through br0. Bridge mode (ConnType=4) makes the ONT a transparent L2 bridge: | |
| PPPoE frames from the downstream router pass straight through to the OLT. | |
| The ONT has a MAC collision bug: eth0 (LAN-facing) and nbif0 (WAN/PON-facing) | |
| share the same MAC. The kernel bridge merges their FDB entries, so frames from | |
| the ISP destined for the router get looped back up nbif0 instead of being | |
| forwarded to eth0. We fix this by changing eth0+br0 to (WAN_MAC - 1) at boot | |
| via a persistent script injected into the rootfs overlay on mtdblock9. | |
| Prerequisites | |
| ------------- | |
| - Python 3.10+ | |
| - pycryptodome: pip install pycryptodome | |
| (used to AES-decrypt the ONT config backup for manu password extraction | |
| and to read encrypted .bin backups during revert) | |
| - Network access to the ONT (default 192.168.0.1) | |
| - Web UI credentials (factory default: excitel / exc@123) | |
| Usage | |
| ----- | |
| Setup (bridge mode): | |
| python3 excitel-bridge-setup.py # full automated setup | |
| python3 excitel-bridge-setup.py --dry-run # preview without changes | |
| python3 excitel-bridge-setup.py --skip-to 6 # resume from step 6 | |
| python3 excitel-bridge-setup.py --no-reboot # skip final reboot | |
| python3 excitel-bridge-setup.py --ont-ip 10.0.0.1 # custom ONT IP | |
| Revert (restore route mode): | |
| python3 excitel-bridge-setup.py --revert # revert all changes | |
| python3 excitel-bridge-setup.py --revert --dry-run # preview revert | |
| python3 excitel-bridge-setup.py --revert --backup-file ~/ont_backup_full.xml | |
| Setup Steps | |
| ----------- | |
| 1. Web UI Login | |
| 2. Enable Telnet via Web UI | |
| 3. Derive `manu` superuser credentials | |
| - Method 1: Download config backup via web UI, decrypt, extract from XML | |
| - Method 2: sismac via ISP telnet account (if shell not restricted) | |
| - Method 3: Prompt user | |
| 4. Extract current config (PPPoE creds, WAN MAC, TR-069, PortBinding, etc.) | |
| 5. Backup config to local files (~/ont_backup_<timestamp>_full.xml + _tables.txt) | |
| *** POINT OF NO RETURN — user must confirm before proceeding *** | |
| 6. Disable TR-069 management (prevents ISP from reverting changes) | |
| 7. Set bridge mode (ConnType=4, IsNAT=0, ServList=1) | |
| 8. Configure PortBinding (ETH1+ETH2 → WCD1.WCPPP1 for LAN→WAN forwarding) | |
| 9. Inject persistent MAC fix boot script | |
| - /userconfig/fix-eth0-mac.sh (jffs2, persists across reboots) | |
| - /etc/rcS.d/S98fixmac via rootfs overlay on mtdblock9 | |
| 10. Reboot ONT | |
| 11. Display downstream router configuration (PPPoE creds, MAC to clone, etc.) | |
| Revert Steps (--revert) | |
| ------------------------ | |
| Undoes ALL changes made by this script, restoring the ONT to factory route mode. | |
| Original values are read from the backup file saved in step 5 (auto-detected | |
| from ~/ont_backup_*_full.xml). If no backup is found, use --backup-file to | |
| specify one (supports both decrypted .xml and encrypted .bin), or Excitel | |
| factory defaults are used (ConnType=1, WANCName=1_TR069_INTERNET_R_VID_). | |
| R1. Web UI Login (same as setup) | |
| R2. Enable Telnet via Web UI (same as setup) | |
| R3. Derive manu credentials (same as setup) | |
| R4. Read current config, load original values from backup, show planned changes | |
| R5. Remove MAC fix boot scripts (/userconfig/fix-eth0-mac.sh + S98fixmac) | |
| R6. Remove PortBinding entries (restore to empty table) | |
| R7. Restore route mode (ConnType, IsNAT, WANCName, ServList from backup) | |
| R8. Re-enable TR-069 (Tr069Enable, PeriodicInformEnable, STUNEnable) | |
| R9. Reboot ONT | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import glob as _glob | |
| import re | |
| import select | |
| import socket | |
| import struct | |
| import sys | |
| import time | |
| import urllib.parse | |
| from datetime import datetime | |
| from http.cookiejar import CookieJar | |
| from pathlib import Path | |
| from urllib.error import URLError | |
| from urllib.request import HTTPCookieProcessor, Request, build_opener | |
| from xml.etree import ElementTree as ET | |
| try: | |
| from Crypto.Cipher import AES | |
| _HAS_AES = True | |
| except ImportError: | |
| _HAS_AES = False | |
| # ── Colors (auto-disabled when not a TTY) ───────────────────── | |
| _USE_COLOR = sys.stdout.isatty() | |
| R = "\033[0;31m" if _USE_COLOR else "" | |
| G = "\033[0;32m" if _USE_COLOR else "" | |
| Y = "\033[1;33m" if _USE_COLOR else "" | |
| C = "\033[0;36m" if _USE_COLOR else "" | |
| B = "\033[1m" if _USE_COLOR else "" | |
| N = "\033[0m" if _USE_COLOR else "" | |
| def info(msg: str) -> None: | |
| print(f"{G}[+]{N} {msg}") | |
| def warn(msg: str) -> None: | |
| print(f"{Y}[!]{N} {msg}") | |
| def error(msg: str) -> None: | |
| print(f"{R}[-]{N} {msg}") | |
| def header(title: str) -> None: | |
| print(f"\n{B}{'=' * 60}") | |
| print(f" {title}") | |
| print(f"{'=' * 60}{N}") | |
| def kv(key: str, val: str, indent: int = 2) -> None: | |
| print(f"{' ' * indent}{key + ':':<28s} {val}") | |
| def step_header(num: int, title: str) -> None: | |
| print(f"\n{B}{'─' * 60}") | |
| print(f" Step {num}: {title}") | |
| print(f"{'─' * 60}{N}") | |
| def confirm(prompt: str) -> bool: | |
| try: | |
| answer = input(f"{Y}[?]{N} {prompt} (y/N): ").strip().lower() | |
| return answer == "y" | |
| except (EOFError, KeyboardInterrupt): | |
| print() | |
| return False | |
| # ── ZTE Config Decryption ───────────────────────────────────── | |
| # Ported from ppc_config_tool.py — decrypts web backup .bin files | |
| # so we can extract manu password and PPPoE credentials without telnet. | |
| _XOR_KEY = b'*&(*H65GFRUY6KH53%#74BUG^%^RFIOO*&*^&^RRU6YOK8PE(&(#TI_+7(U9(7!U(HF*(ET6FGHKDIO8E@67!R#@#' | |
| _AES_KEY = bytes(16) # All zeros — yes, really | |
| _CRC_PATTERN = bytes.fromhex('b1ca8498cc0d83389e5555c03011ecdc') | |
| _ZTE_HEADER_MAGIC = bytes.fromhex('999999994444444455555555aaaaaaaa') | |
| _PPCT_MAGIC = b'PPCT' | |
| def _xor_data(data: bytes, key: bytes) -> bytes: | |
| return bytes(byte ^ key[i % len(key)] for i, byte in enumerate(data)) | |
| def _remove_chunk_markers(data: bytes) -> bytes: | |
| result = bytearray() | |
| i = 0 | |
| while i < len(data): | |
| if i + 12 <= len(data): | |
| s1 = int.from_bytes(data[i:i + 4], 'big') | |
| s2 = int.from_bytes(data[i + 4:i + 8], 'big') | |
| if s1 == s2 and 100 < s1 < 100000: | |
| i += 12 | |
| continue | |
| result.append(data[i]) | |
| i += 1 | |
| return bytes(result) | |
| def _remove_crc_blocks(data: bytes) -> bytes: | |
| result = bytearray() | |
| i = 0 | |
| while i < len(data): | |
| if i + 16 <= len(data) and data[i:i + 16] == _CRC_PATTERN: | |
| i += 16 | |
| else: | |
| result.append(data[i]) | |
| i += 1 | |
| return bytes(result) | |
| def decrypt_web_backup(data: bytes) -> bytes: | |
| """Decrypt a ZTE web backup .bin file to XML. | |
| Structure: 128-byte ZTE header → XOR layer → PPCT header → AES-ECB layer. | |
| Returns the <DB>...</DB> XML content. | |
| """ | |
| if not _HAS_AES: | |
| raise RuntimeError("pycryptodome required: pip install pycryptodome") | |
| if len(data) < 256 or data[:16] != _ZTE_HEADER_MAGIC: | |
| raise ValueError("Not a valid ZTE backup file") | |
| xor_dec = _xor_data(data[128:], _XOR_KEY) | |
| if xor_dec[12:16] != _PPCT_MAGIC: | |
| raise ValueError("Invalid PPCT header") | |
| enc_prefix = xor_dec[0x58:0x78] # 32 bytes | |
| enc_data = xor_dec[128:][8:] # Skip 8-byte CRC suffix | |
| clean = _remove_crc_blocks(_remove_chunk_markers(enc_data)) | |
| full = enc_prefix + clean | |
| aligned = len(full) - (len(full) % 16) | |
| dec = AES.new(_AES_KEY, AES.MODE_ECB).decrypt(full[:aligned]) | |
| s = dec.find(b'<DB>') | |
| e = dec.rfind(b'</DB>') | |
| if s >= 0 and e >= 0: | |
| return dec[s:e + 5] | |
| return dec | |
| # ── Telnet ──────────────────────────────────────────────────── | |
| class ONTTelnet: | |
| """Manage telnet sessions to the ONT via raw sockets.""" | |
| def __init__(self, host: str, user: str, passwd: str, timeout: int = 15): | |
| self.host = host | |
| self.user = user | |
| self.passwd = passwd | |
| self.timeout = timeout | |
| def _read_until(self, sock: socket.socket, marker: bytes, | |
| timeout: float | None = None) -> bytes: | |
| timeout = timeout or self.timeout | |
| sock.settimeout(timeout) | |
| buf = b"" | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| try: | |
| chunk = sock.recv(4096) | |
| if not chunk: | |
| break | |
| buf += chunk | |
| if marker in buf: | |
| break | |
| except socket.timeout: | |
| break | |
| return buf | |
| def _strip_telnet_negotiation(self, data: bytes) -> bytes: | |
| result = b"" | |
| i = 0 | |
| while i < len(data): | |
| if data[i:i + 1] == b"\xff" and i + 1 < len(data): | |
| cmd = data[i + 1:i + 2] | |
| if cmd in (b"\xfb", b"\xfc", b"\xfd", b"\xfe"): | |
| i += 3 | |
| elif cmd == b"\xfa": | |
| end = data.find(b"\xff\xf0", i) | |
| i = end + 2 if end >= 0 else len(data) | |
| else: | |
| i += 2 | |
| else: | |
| result += data[i:i + 1] | |
| i += 1 | |
| return result | |
| def _connect_and_login(self) -> socket.socket | None: | |
| try: | |
| sock = socket.create_connection((self.host, 23), timeout=self.timeout) | |
| except Exception as e: | |
| error(f"Telnet connect failed: {e}") | |
| return None | |
| data = self._read_until(sock, b"Login:") | |
| self._strip_telnet_negotiation(data) | |
| sock.sendall(self.user.encode() + b"\r\n") | |
| self._read_until(sock, b"Password:") | |
| sock.sendall(self.passwd.encode() + b"\r\n") | |
| data = self._read_until(sock, b"/ #") | |
| text = self._strip_telnet_negotiation(data).decode("utf-8", errors="replace") | |
| if "Login incorrect" in text or "Login:" in text: | |
| sock.close() | |
| return None | |
| return sock | |
| def _parse_output(self, raw: bytes, cmd: str) -> list[str]: | |
| raw = self._strip_telnet_negotiation(raw) | |
| text = raw.decode("utf-8", errors="replace") | |
| text = re.sub(r"\x1b\[[0-9;]*m", "", text) | |
| lines = text.splitlines() | |
| if lines and cmd.strip() in lines[0]: | |
| lines = lines[1:] | |
| if lines and "/ #" in lines[-1]: | |
| last = lines[-1].replace("/ #", "").rstrip() | |
| lines[-1:] = [last] if last else [] | |
| return lines | |
| def run(self, *cmds: str) -> str: | |
| sock = self._connect_and_login() | |
| if not sock: | |
| return "" | |
| try: | |
| output_lines: list[str] = [] | |
| for cmd in cmds: | |
| sock.sendall(cmd.encode() + b"\r\n") | |
| raw = self._read_until(sock, b"/ #") | |
| output_lines.extend(self._parse_output(raw, cmd)) | |
| sock.sendall(b"exit\r\n") | |
| sock.close() | |
| return "\n".join(output_lines) | |
| except Exception as e: | |
| error(f"Telnet error: {e}") | |
| try: | |
| sock.close() | |
| except Exception: | |
| pass | |
| return "" | |
| def run_long(self, *cmds: str, timeout: float = 30) -> str: | |
| sock = self._connect_and_login() | |
| if not sock: | |
| return "" | |
| try: | |
| output_lines: list[str] = [] | |
| for cmd in cmds: | |
| sock.sendall(cmd.encode() + b"\r\n") | |
| raw = self._read_until(sock, b"/ #", timeout=timeout) | |
| output_lines.extend(self._parse_output(raw, cmd)) | |
| sock.sendall(b"exit\r\n") | |
| sock.close() | |
| return "\n".join(output_lines) | |
| except Exception as e: | |
| error(f"Telnet error: {e}") | |
| try: | |
| sock.close() | |
| except Exception: | |
| pass | |
| return "" | |
| def write_file(self, path: str, content: str, mode: int = 0o755) -> bool: | |
| """Write a file on the ONT via telnet heredoc with inter-line delay.""" | |
| sock = self._connect_and_login() | |
| if not sock: | |
| return False | |
| try: | |
| sock.sendall(f"cat > {path} << 'HEREDOC_EOF'\r\n".encode()) | |
| time.sleep(0.1) | |
| for line in content.splitlines(): | |
| sock.sendall(line.encode() + b"\r\n") | |
| time.sleep(0.02) | |
| sock.sendall(b"HEREDOC_EOF\r\n") | |
| self._read_until(sock, b"/ #", timeout=5) | |
| sock.sendall(f"chmod {oct(mode)[2:]} {path}\r\n".encode()) | |
| self._read_until(sock, b"/ #", timeout=3) | |
| sock.sendall(b"exit\r\n") | |
| sock.close() | |
| return True | |
| except Exception as e: | |
| error(f"Telnet write_file error: {e}") | |
| try: | |
| sock.close() | |
| except Exception: | |
| pass | |
| return False | |
| def check_login(self) -> bool: | |
| sock = self._connect_and_login() | |
| if sock: | |
| try: | |
| sock.sendall(b"exit\r\n") | |
| sock.close() | |
| except Exception: | |
| pass | |
| return True | |
| return False | |
| # ── HTTP / Web UI ───────────────────────────────────────────── | |
| class ONTWeb: | |
| """Manage HTTP sessions to the ONT web UI. | |
| The ONT only serves plain HTTP. If we get an SSL/HTTPS response, | |
| we're hitting the wrong device (e.g. the downstream router). | |
| """ | |
| def __init__(self, base_url: str): | |
| self.base_url = base_url | |
| self.host = base_url.replace("http://", "").split("/")[0] | |
| self.cookies = CookieJar() | |
| self.opener = build_opener(HTTPCookieProcessor(self.cookies)) | |
| self.session_token = "" | |
| self.logged_in = False | |
| def _request(self, path: str, data: dict | None = None, | |
| timeout: int = 10) -> str: | |
| url = f"{self.base_url}/{path}" if path else self.base_url + "/" | |
| try: | |
| if data is not None: | |
| encoded = urllib.parse.urlencode(data).encode() | |
| req = Request(url, data=encoded) | |
| else: | |
| req = Request(url) | |
| resp = self.opener.open(req, timeout=timeout) | |
| if resp.url and resp.url.startswith("https://"): | |
| error(f"Redirected to HTTPS ({resp.url}) — this is not the ONT") | |
| return "" | |
| return resp.read().decode("utf-8", errors="replace") | |
| except URLError as e: | |
| if "SSL" in str(e) or "certificate" in str(e).lower(): | |
| error("Got SSL/HTTPS response — this is not the ONT (ONT is HTTP only)") | |
| else: | |
| error(f"HTTP error: {e}") | |
| return "" | |
| def get(self, path: str) -> str: | |
| return self._request(path) | |
| def post(self, path: str, data: dict) -> str: | |
| return self._request(path, data) | |
| def get_page(self, page_name: str) -> str: | |
| return self.get(f"getpage.gch?pid=1002&nextpage={page_name}") | |
| def login(self, username: str, password: str) -> bool: | |
| """Login to web UI. Returns True on success.""" | |
| self.cookies.clear() | |
| self.logged_in = False | |
| html = self._request("") | |
| if not html: | |
| error("Cannot reach ONT web UI") | |
| return False | |
| m = re.search(r'Frm_Logintoken.*?value\s*=\s*"(\d*)"', html) | |
| token = m.group(1) if m else "0" | |
| if not token: | |
| token = "0" | |
| self.post("", { | |
| "action": "login", | |
| "Frm_Logintoken": token, | |
| "Username": username, | |
| "Password": password, | |
| }) | |
| # Verify login by fetching a protected page | |
| check = self.get_page("status_dev_info_t.gch") | |
| if "logout_redirect" in check: | |
| return False | |
| self.logged_in = True | |
| return True | |
| def logout(self) -> None: | |
| self.post("", {"logout": "1", "logout_from": "login_timeout"}) | |
| self.cookies.clear() | |
| self.logged_in = False | |
| def enable_telnet(self) -> bool: | |
| """Enable telnet on the ONT via the firewall/services page.""" | |
| html = self.get_page("sec_sc_t.gch") | |
| if not html: | |
| error("Could not fetch firewall settings page") | |
| return False | |
| m = re.search(r'session_token\s*=\s*"([^"]*)"', html) | |
| if not m: | |
| error("Could not extract session token from sec_sc_t.gch") | |
| return False | |
| self.session_token = m.group(1) | |
| form_data = { | |
| "_SESSION_TOKEN": self.session_token, | |
| "IF_ACTION": "apply", | |
| "IF_ERRORSTR": "SUCC", | |
| "IF_ERRORPARAM": "SUCC", | |
| "IF_ERRORTYPE": "-1286224680", | |
| "ViewName": "NULL", | |
| "Enable": "1", | |
| "INCViewName": "IGD.LD1", | |
| "INCName": "LAN", | |
| "MinSrcIp": "192.168.0.1", | |
| "MinSrcMask": "NULL", | |
| "MaxSrcIp": "192.168.255.255", | |
| "FilterTarget": "1", | |
| "Servise": "10", | |
| "ViewName0": "IGD.FWSc.FWSC1", | |
| "Enable0": "1", | |
| "INCViewName0": "IGD.WANIF", | |
| "INCName0": "WAN", | |
| "MinSrcIp0": "", | |
| "MinSrcMask0": "0.0.0.0", | |
| "MaxSrcIp0": "", | |
| "FilterTarget0": "1", | |
| "Servise0": "1", | |
| "ViewName1": "IGD.FWSc.FWSC2", | |
| "Enable1": "1", | |
| "INCViewName1": "IGD.LD1", | |
| "INCName1": "LAN", | |
| "MinSrcIp1": "192.168.0.1", | |
| "MinSrcMask1": "0.0.0.0", | |
| "MaxSrcIp1": "192.168.255.255", | |
| "FilterTarget1": "1", | |
| "Servise1": "10", | |
| "IF_INDEX": "1", | |
| "IF_INSTNUM": "2", | |
| } | |
| resp = self.post("getpage.gch?pid=1002&nextpage=sec_sc_t.gch", form_data) | |
| return bool(resp) | |
| def download_config_backup(self) -> bytes: | |
| """Download encrypted config backup via the web UI. | |
| Uses the manager_dev_config_t.gch page with raw socket for reliable | |
| large-file download. Returns raw encrypted .bin bytes. | |
| """ | |
| # Get session token from backup page | |
| page = self.get_page("manager_dev_config_t.gch") | |
| m = re.search(r'session_token\s*=\s*"([^"]*)"', page) | |
| if not m or not m.group(1): | |
| raise RuntimeError("Could not get session token from backup page") | |
| session_token = m.group(1) | |
| cookie_str = "; ".join(f"{c.name}={c.value}" for c in self.cookies) | |
| # Raw socket download (urllib can't handle the multipart POST reliably) | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| sock.settimeout(180) | |
| sock.connect((self.host, 80)) | |
| boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW" | |
| body = ( | |
| f"--{boundary}\r\n" | |
| f'Content-Disposition: form-data; name="config"\r\n\r\n\r\n' | |
| f"--{boundary}\r\n" | |
| f'Content-Disposition: form-data; name="_SESSION_TOKEN"\r\n' | |
| f"\r\n{session_token}\r\n" | |
| f"--{boundary}--\r\n" | |
| ) | |
| body_bytes = body.encode() | |
| http_req = ( | |
| f"POST /getpage.gch?pid=101&nextpage=manager_dev_config_t.gch HTTP/1.1\r\n" | |
| f"Host: {self.host}\r\n" | |
| f"Content-Type: multipart/form-data; boundary={boundary}\r\n" | |
| f"Content-Length: {len(body_bytes)}\r\n" | |
| ) | |
| if cookie_str: | |
| http_req += f"Cookie: {cookie_str}\r\n" | |
| http_req += "Connection: keep-alive\r\n\r\n" | |
| sock.sendall(http_req.encode() + body_bytes) | |
| # Read HTTP response headers | |
| resp_hdr = b"" | |
| while not resp_hdr.endswith(b"\r\n\r\n"): | |
| c = sock.recv(1) | |
| if not c: | |
| raise RuntimeError("Connection closed during header read") | |
| resp_hdr += c | |
| expected = 0 | |
| for line in resp_hdr.decode().split("\r\n"): | |
| if line.lower().startswith("content-length:"): | |
| expected = int(line.split(":")[1].strip()) | |
| if expected == 0: | |
| sock.close() | |
| raise RuntimeError("No Content-Length in backup download response") | |
| # Read body | |
| content = b"" | |
| remaining = expected | |
| while remaining > 0: | |
| ready = select.select([sock], [], [], 60) | |
| if ready[0]: | |
| chunk = sock.recv(min(65536, remaining)) | |
| if not chunk: | |
| break | |
| content += chunk | |
| remaining -= len(chunk) | |
| else: | |
| break | |
| sock.close() | |
| if len(content) != expected: | |
| raise RuntimeError( | |
| f"Download incomplete: got {len(content)}, expected {expected}") | |
| if content[:16] != _ZTE_HEADER_MAGIC: | |
| raise RuntimeError("Downloaded file is not a valid ZTE backup") | |
| return content | |
| # ── sendcmd XML parser ──────────────────────────────────────── | |
| def parse_sendcmd_xml(text: str) -> list[dict[str, str]]: | |
| m = re.search(r"(<Tbl.*</Tbl>)", text, re.DOTALL) | |
| if not m: | |
| return [] | |
| try: | |
| root = ET.fromstring(m.group(1)) | |
| except ET.ParseError: | |
| return [] | |
| rows = [] | |
| for row in root.findall("Row"): | |
| d = {} | |
| for dm in row.findall("DM"): | |
| d[dm.get("name", "")] = dm.get("val", "") | |
| rows.append(d) | |
| return rows | |
| # ── Decrypted XML parser ───────────────────────────────────── | |
| def parse_config_xml_table(xml_str: str, table_name: str) -> list[dict[str, str]]: | |
| """Extract rows from a named <Tbl> in the decrypted config XML.""" | |
| pattern = rf'<Tbl\s+name="{re.escape(table_name)}"[^>]*>(.*?)</Tbl>' | |
| m = re.search(pattern, xml_str, re.DOTALL) | |
| if not m: | |
| return [] | |
| tbl_xml = m.group(0) | |
| try: | |
| root = ET.fromstring(tbl_xml) | |
| except ET.ParseError: | |
| return [] | |
| rows = [] | |
| for row in root.findall("Row"): | |
| d = {} | |
| for dm in row.findall("DM"): | |
| d[dm.get("name", "")] = dm.get("val", "") | |
| rows.append(d) | |
| return rows | |
| # ── Network Helpers ─────────────────────────────────────────── | |
| def port_open(host: str, port: int, timeout: float = 2) -> bool: | |
| try: | |
| sock = socket.create_connection((host, port), timeout=timeout) | |
| sock.close() | |
| return True | |
| except (OSError, socket.timeout): | |
| return False | |
| def wait_for_port(host: str, port: int, timeout: float = 60, | |
| interval: float = 5, want_open: bool = True) -> bool: | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| if port_open(host, port) == want_open: | |
| return True | |
| remaining = deadline - time.time() | |
| if remaining > 0: | |
| time.sleep(min(interval, remaining)) | |
| return False | |
| def mac_decrement(mac: str) -> str: | |
| clean = mac.replace(":", "").replace("-", "") | |
| num = int(clean, 16) - 1 | |
| hex_str = f"{num:012X}" | |
| return ":".join(hex_str[i:i + 2] for i in range(0, 12, 2)) | |
| # ── Main Setup Class ───────────────────────────────────────── | |
| class ExcitelBridgeSetup: | |
| """Orchestrates the full bridge mode setup across all 11 steps.""" | |
| DEFAULT_WEB_USER = "excitel" | |
| DEFAULT_WEB_PASS = "exc@123" | |
| ISP_TELNET_USER = "excitel" | |
| ISP_TELNET_PASS = "3xCMon1t@123" | |
| # Known Excitel route-mode defaults (used when no backup file available) | |
| EXCITEL_ROUTE_DEFAULTS = { | |
| "ConnType": "1", # Route/DHCP (Excitel factory default) | |
| "IsNAT": "1", # NAT enabled | |
| "WANCName": "1_TR069_INTERNET_R_VID_", # Factory name includes TR069 | |
| "ServList": "3", # INTERNET + TR069 | |
| "Tr069Enable": "1", | |
| "PeriodicInformEnable": "1", | |
| "STUNEnable": "1", | |
| } | |
| def __init__(self, ont_ip: str, dry_run: bool, skip_to: int, | |
| no_reboot: bool, revert: bool = False, | |
| backup_file: str | None = None): | |
| self.ont_ip = ont_ip | |
| self.ont_url = f"http://{ont_ip}" | |
| self.dry_run = dry_run | |
| self.skip_to = skip_to | |
| self.no_reboot = no_reboot | |
| self.revert = revert | |
| self.backup_file = backup_file | |
| self.web: ONTWeb | None = None | |
| self.web_user = "" | |
| self.web_pass = "" | |
| self.manu_pass = "" | |
| self.pppoe_user = "" | |
| self.pppoe_pass = "" | |
| self.wan_mac = "" | |
| self.base_mac = "" | |
| self.serial_number = "" | |
| self.model_name = "" | |
| self.conn_type = "" | |
| self.wan_name = "" | |
| self.serv_list = "" | |
| self.is_nat = "" | |
| self.tr069_enabled = "" | |
| self.periodic_inform = "" | |
| self.stun_enabled = "" | |
| self.port_binding_rows: list[dict] = [] | |
| self.backup_dir = Path.home() | |
| self._decrypted_xml: str = "" # Full decrypted config XML string | |
| self._original_values: dict[str, str] = {} # Original values from backup | |
| def _tn(self, user: str | None = None, | |
| passwd: str | None = None) -> ONTTelnet: | |
| if user and passwd: | |
| return ONTTelnet(self.ont_ip, user, passwd) | |
| if self.manu_pass: | |
| return ONTTelnet(self.ont_ip, "manu", self.manu_pass) | |
| return ONTTelnet(self.ont_ip, self.ISP_TELNET_USER, | |
| self.ISP_TELNET_PASS) | |
| def should_run(self, step: int) -> bool: | |
| return step >= self.skip_to | |
| # ── Step 1: Web UI Login ────────────────────────────────── | |
| def step1_web_login(self) -> bool: | |
| """Login to the ONT web UI. | |
| Tries factory default credentials first, then prompts. | |
| If login fails due to a stale session (ONT only allows one), | |
| instructs user to power-cycle or clears via telnet if available. | |
| """ | |
| step_header(1, "Web UI Login") | |
| if not port_open(self.ont_ip, 80): | |
| error(f"Cannot reach {self.ont_ip}:80 — is the ONT powered on and connected?") | |
| return False | |
| info(f"ONT web UI reachable at {self.ont_url}") | |
| if self.dry_run: | |
| info("[DRY RUN] Would attempt login with factory credentials") | |
| self.web_user = self.DEFAULT_WEB_USER | |
| self.web_pass = self.DEFAULT_WEB_PASS | |
| self.web = ONTWeb(self.ont_url) | |
| return True | |
| self.web = ONTWeb(self.ont_url) | |
| # Try factory defaults | |
| info(f"Trying factory credentials: {self.DEFAULT_WEB_USER} / {self.DEFAULT_WEB_PASS}") | |
| if self.web.login(self.DEFAULT_WEB_USER, self.DEFAULT_WEB_PASS): | |
| info("Logged in with factory credentials") | |
| self.web_user = self.DEFAULT_WEB_USER | |
| self.web_pass = self.DEFAULT_WEB_PASS | |
| return True | |
| warn("Factory credentials failed") | |
| # Prompt user | |
| for attempt in range(1, 4): | |
| print(f"\n Attempt {attempt}/3:") | |
| try: | |
| user = input(" Web username: ").strip() | |
| passwd = input(" Web password: ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print() | |
| return False | |
| if not user or not passwd: | |
| warn("Empty username or password") | |
| continue | |
| self.web = ONTWeb(self.ont_url) | |
| if self.web.login(user, passwd): | |
| info(f"Logged in as {user}") | |
| self.web_user = user | |
| self.web_pass = passwd | |
| return True | |
| warn("Login failed, try again") | |
| error("All login attempts exhausted") | |
| return False | |
| # ── Step 2: Enable Telnet ───────────────────────────────── | |
| def step2_enable_telnet(self) -> bool: | |
| """Enable telnet via web UI firewall page. Skips if port 23 already open.""" | |
| step_header(2, "Enable Telnet via Web UI") | |
| if port_open(self.ont_ip, 23): | |
| info("Telnet port 23 already open — skipping") | |
| return True | |
| if self.dry_run: | |
| info("[DRY RUN] Would POST firewall form to enable telnet") | |
| return True | |
| if not self.web or not self.web.logged_in: | |
| error("Web UI not logged in — cannot enable telnet") | |
| return False | |
| info("Enabling telnet via firewall settings...") | |
| if not self.web.enable_telnet(): | |
| error("Failed to POST telnet enable form") | |
| return False | |
| info("Waiting for telnet port to open...") | |
| if wait_for_port(self.ont_ip, 23, timeout=8, interval=1): | |
| info("Telnet is now available on port 23") | |
| return True | |
| error("Telnet port 23 did not open within 8 seconds") | |
| return False | |
| # ── Step 3: Derive manu Credentials ─────────────────────── | |
| def _try_config_backup_derive(self) -> str | None: | |
| """Download config backup via web UI, decrypt, extract manu password. | |
| The decrypted XML contains TelnetUser and AuthUser tables with the | |
| manu username and password in cleartext. | |
| """ | |
| if not _HAS_AES: | |
| warn("pycryptodome not installed — cannot decrypt config backup") | |
| warn("Install with: pip install pycryptodome") | |
| return None | |
| if not self.web or not self.web.logged_in: | |
| return None | |
| info("Downloading config backup via web UI...") | |
| try: | |
| raw = self.web.download_config_backup() | |
| info(f"Downloaded {len(raw)} bytes, decrypting...") | |
| xml_bytes = decrypt_web_backup(raw) | |
| self._decrypted_xml = xml_bytes.decode("utf-8", errors="replace") | |
| info(f"Decrypted config: {len(self._decrypted_xml)} chars") | |
| except Exception as e: | |
| warn(f"Config backup/decrypt failed: {e}") | |
| return None | |
| # Search TelnetUser table | |
| rows = parse_config_xml_table(self._decrypted_xml, "TelnetUser") | |
| for row in rows: | |
| if row.get("Username") == "manu" and row.get("Password"): | |
| return row["Password"] | |
| # Search AuthUser table | |
| rows = parse_config_xml_table(self._decrypted_xml, "AuthUser") | |
| for row in rows: | |
| if row.get("User") == "manu" and row.get("Pass"): | |
| return row["Pass"] | |
| # Brute-force search | |
| m = re.search( | |
| r'<DM\s+name="(?:User(?:name)?|User)"\s+val="manu"/>\s*' | |
| r'<DM\s+name="(?:Pass(?:word)?|Pass)"\s+val="([^"]+)"', | |
| self._decrypted_xml) | |
| if m: | |
| return m.group(1) | |
| return None | |
| def _try_sismac_derive(self) -> str | None: | |
| """Try to derive manu password via sismac on the ISP telnet account.""" | |
| info(f"Trying sismac via ISP account ({self.ISP_TELNET_USER})...") | |
| tn_isp = ONTTelnet(self.ont_ip, self.ISP_TELNET_USER, | |
| self.ISP_TELNET_PASS) | |
| out = tn_isp.run("sismac 2 2194") | |
| if not out or "Parse error" in out: | |
| return None | |
| info("sismac output:") | |
| for line in out.splitlines(): | |
| print(f" {line}") | |
| hex_bytes = re.findall(r"\b[0-9a-fA-F]{2}\b", out) | |
| if not hex_bytes: | |
| return None | |
| ascii_str = "" | |
| for hb in hex_bytes: | |
| val = int(hb, 16) | |
| if 0x20 <= val <= 0x7E: | |
| ascii_str += chr(val) | |
| return ascii_str[:8] if len(ascii_str) >= 8 else None | |
| def _verify_manu(self, password: str) -> bool: | |
| tn = ONTTelnet(self.ont_ip, "manu", password) | |
| verify = tn.run("echo TELNET_OK") | |
| return "TELNET_OK" in verify | |
| def step3_derive_manu(self) -> bool: | |
| """Obtain the manu superuser password. | |
| Tries three methods in order: | |
| 1. Download config backup via web UI → decrypt → extract from TelnetUser table | |
| 2. sismac via ISP telnet account (if the shell isn't restricted) | |
| 3. Prompt user | |
| """ | |
| step_header(3, "Derive manu Credentials & Verify Telnet") | |
| if self.dry_run: | |
| info("[DRY RUN] Would derive manu password via config backup or sismac") | |
| self.manu_pass = "<derived>" | |
| return True | |
| # Method 1: Config backup decrypt (most reliable) | |
| derived = self._try_config_backup_derive() | |
| if derived: | |
| info(f"Extracted manu password from config backup: {derived}") | |
| if self._verify_manu(derived): | |
| self.manu_pass = derived | |
| info("manu login verified successfully") | |
| return True | |
| warn(f"Extracted password '{derived}' failed telnet verification") | |
| # Method 2: sismac via ISP telnet account | |
| derived = self._try_sismac_derive() | |
| if derived: | |
| info(f"Derived manu password via sismac: {derived}") | |
| if self._verify_manu(derived): | |
| self.manu_pass = derived | |
| info("manu login verified successfully") | |
| return True | |
| warn(f"Derived password '{derived}' failed verification") | |
| # Method 3: Prompt user | |
| warn("Automatic manu password extraction failed") | |
| info("Please provide the manu password manually.") | |
| info("(Check ONT sticker, ISP records, or ask your ISP)") | |
| for attempt in range(1, 4): | |
| try: | |
| passwd = input(f"\n manu password (attempt {attempt}/3): ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print() | |
| return False | |
| if not passwd: | |
| warn("Empty password") | |
| continue | |
| info(f"Testing manu / {passwd}...") | |
| if self._verify_manu(passwd): | |
| self.manu_pass = passwd | |
| info("manu login verified successfully") | |
| return True | |
| warn("Login failed") | |
| error("Could not obtain working manu credentials") | |
| return False | |
| # ── Step 4: Extract Current Config ──────────────────────── | |
| def step4_extract_config(self) -> bool: | |
| """Extract all relevant configuration from the ONT. | |
| Uses the decrypted config XML (from step 3) if available, otherwise | |
| falls back to telnet sendcmd queries. | |
| """ | |
| step_header(4, "Extract Current Configuration") | |
| if self.dry_run: | |
| info("[DRY RUN] Would read WANCPPP, WANC, DevInfo, MgtServer, PortBinding, BrGrp") | |
| self.pppoe_user = "<pppoe_user>" | |
| self.pppoe_pass = "<pppoe_pass>" | |
| self.wan_mac = "A2:3E:D1:7B:44:F6" | |
| self.base_mac = "A2:3E:D1:7B:44:F5" | |
| return True | |
| tn = self._tn() | |
| # ── PPPoE credentials ── | |
| # Try decrypted XML first (has cleartext password) | |
| if self._decrypted_xml: | |
| info("Extracting PPPoE credentials from decrypted config...") | |
| ppp_rows = parse_config_xml_table(self._decrypted_xml, "WANCPPP") | |
| if ppp_rows: | |
| self.pppoe_user = ppp_rows[0].get("UserName", "") | |
| self.pppoe_pass = ppp_rows[0].get("Password", "") | |
| kv("PPPoE Username", self.pppoe_user) | |
| kv("PPPoE Password", self.pppoe_pass) | |
| else: | |
| # Fallback: telnet | |
| info("Reading WANCPPP via telnet...") | |
| out = tn.run("sendcmd 1 DB p WANCPPP") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| self.pppoe_user = rows[0].get("UserName", "") | |
| kv("PPPoE Username", self.pppoe_user) | |
| info("Decrypting config via telnet for PPPoE password...") | |
| tn.run("sendcmd 1 DB decry /userconfig/cfg/db_user_cfg.xml") | |
| time.sleep(1) | |
| decrypted = tn.run_long("cat /var/tmp/debug-decry-cfg", timeout=45) | |
| tn.run("rm -f /var/tmp/debug-decry-cfg") | |
| if decrypted: | |
| self._decrypted_xml = decrypted | |
| ppp_match = re.search( | |
| r'<Tbl\s+name="WANCPPP"[^>]*>.*?' | |
| r'<DM\s+name="Password"\s+val="([^"]*)"', | |
| decrypted, re.DOTALL) | |
| if ppp_match: | |
| self.pppoe_pass = ppp_match.group(1) | |
| kv("PPPoE Password", self.pppoe_pass) | |
| # ── WANC (WAN connection settings) — always via telnet for live state ── | |
| info("Reading WANC (WAN connection)...") | |
| out = tn.run("sendcmd 1 DB p WANC") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| row0 = rows[0] | |
| self.wan_mac = row0.get("WorkIFMac", "") | |
| self.conn_type = row0.get("ConnType", "") | |
| self.wan_name = row0.get("WANCName", "") | |
| self.serv_list = row0.get("ServList", "") | |
| ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"} | |
| kv("WAN MAC", self.wan_mac) | |
| kv("ConnType", f"{self.conn_type} ({ct_names.get(self.conn_type, '?')})") | |
| kv("WANCName", self.wan_name) | |
| kv("ServList", self.serv_list) | |
| if self.wan_mac: | |
| self.base_mac = mac_decrement(self.wan_mac) | |
| kv("Base MAC (WAN-1)", self.base_mac) | |
| # ── DevInfo ── | |
| info("Reading DevInfo...") | |
| out = tn.run("sendcmd 1 DB p DevInfo") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| self.serial_number = rows[0].get("SerialNumber", "") | |
| self.model_name = rows[0].get("ModelName", "") | |
| kv("Serial Number", self.serial_number) | |
| kv("Model", self.model_name) | |
| # ── MgtServer (TR-069) ── | |
| info("Reading MgtServer (TR-069)...") | |
| out = tn.run("sendcmd 1 DB p MgtServer") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| self.tr069_enabled = rows[0].get("Tr069Enable", "") | |
| self.periodic_inform = rows[0].get("PeriodicInformEnable", "") | |
| self.stun_enabled = rows[0].get("STUNEnable", "") | |
| kv("TR-069 Enable", self.tr069_enabled) | |
| kv("Periodic Inform", self.periodic_inform) | |
| kv("STUN Enable", self.stun_enabled) | |
| # ── PortBinding ── | |
| info("Reading PortBinding...") | |
| out = tn.run("sendcmd 1 DB p PortBinding") | |
| self.port_binding_rows = parse_sendcmd_xml(out) | |
| kv("PortBinding entries", str(len(self.port_binding_rows))) | |
| for r in self.port_binding_rows: | |
| print(f" {r.get('WANViewName', '?')} <-> {r.get('LANViewName', '?')}") | |
| # ── BrGrp ── | |
| info("Reading BrGrp...") | |
| out = tn.run("sendcmd 1 DB p BrGrp") | |
| brgrp_rows = parse_sendcmd_xml(out) | |
| kv("BrGrp entries", str(len(brgrp_rows))) | |
| # ── Warnings ── | |
| if self.conn_type == "4": | |
| warn("ONT is ALREADY in bridge mode (ConnType=4)") | |
| if not self.pppoe_user: | |
| warn("PPPoE username not found — you'll need it for your router config") | |
| if not self.pppoe_pass: | |
| warn("PPPoE password not extracted — you'll need it for your router config") | |
| return True | |
| # ── Step 5: Backup Config ───────────────────────────────── | |
| def step5_backup(self) -> bool: | |
| """Save config to local files. POINT OF NO RETURN gate.""" | |
| step_header(5, "Backup Configuration") | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if self.dry_run: | |
| info(f"[DRY RUN] Would save backup to:") | |
| info(f" {self.backup_dir}/ont_backup_{ts}_full.xml") | |
| info(f" {self.backup_dir}/ont_backup_{ts}_tables.txt") | |
| return True | |
| tn = self._tn() | |
| # Full decrypted XML | |
| if self._decrypted_xml: | |
| xml_path = self.backup_dir / f"ont_backup_{ts}_full.xml" | |
| xml_path.write_text(self._decrypted_xml, encoding="utf-8") | |
| info(f"Full config saved: {xml_path}") | |
| else: | |
| warn("No decrypted config available — skipping full XML backup") | |
| # Key tables dump | |
| tables_path = self.backup_dir / f"ont_backup_{ts}_tables.txt" | |
| tables = ["WANC", "WANCPPP", "MgtServer", "PortBinding", "BrGrp", "DevInfo"] | |
| lines: list[str] = [] | |
| for tbl in tables: | |
| lines.append(f"\n{'=' * 40}") | |
| lines.append(f"Table: {tbl}") | |
| lines.append(f"{'=' * 40}") | |
| out = tn.run(f"sendcmd 1 DB p {tbl}") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| for i, row in enumerate(rows): | |
| lines.append(f" --- Row {i} ---") | |
| for k, v in row.items(): | |
| lines.append(f" {k + ':':<28s} {v}") | |
| else: | |
| lines.append(" (no rows)") | |
| tables_path.write_text("\n".join(lines), encoding="utf-8") | |
| info(f"Tables dump saved: {tables_path}") | |
| # ── POINT OF NO RETURN ── | |
| print(f"\n{Y}{'=' * 60}") | |
| print(f" POINT OF NO RETURN") | |
| print(f" The next steps will modify the ONT configuration.") | |
| print(f" Backups saved to: {self.backup_dir}") | |
| print(f"{'=' * 60}{N}") | |
| if not confirm("Proceed with bridge mode setup?"): | |
| info("Aborted by user") | |
| return False | |
| return True | |
| # ── Step 6: Disable TR-069 ──────────────────────────────── | |
| def step6_disable_tr069(self) -> bool: | |
| """Disable TR-069 so the ISP can't revert bridge mode.""" | |
| step_header(6, "Disable TR-069") | |
| if self.tr069_enabled == "0" and self.periodic_inform == "0" and self.stun_enabled == "0": | |
| info("TR-069 already disabled — skipping") | |
| return True | |
| if self.dry_run: | |
| info("[DRY RUN] Would set MgtServer: Tr069Enable=0, PeriodicInformEnable=0, STUNEnable=0") | |
| return True | |
| tn = self._tn() | |
| info("Disabling TR-069...") | |
| out = tn.run( | |
| "sendcmd 1 DB set MgtServer 0 Tr069Enable 0", | |
| "sendcmd 1 DB set MgtServer 0 PeriodicInformEnable 0", | |
| "sendcmd 1 DB set MgtServer 0 STUNEnable 0", | |
| "sendcmd 1 DB save", | |
| ) | |
| for line in out.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| print(f" {stripped}") | |
| info("Verifying...") | |
| out = tn.run("sendcmd 1 DB p MgtServer") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| ok = (rows[0].get("Tr069Enable") == "0" and | |
| rows[0].get("PeriodicInformEnable") == "0" and | |
| rows[0].get("STUNEnable") == "0") | |
| if ok: | |
| info("TR-069 disabled successfully") | |
| else: | |
| warn("Some TR-069 fields may not have been set correctly") | |
| return True | |
| # ── Step 7: Set Bridge Mode ─────────────────────────────── | |
| def step7_set_bridge(self) -> bool: | |
| """Set WAN connection to bridge/passthrough mode (ConnType=4).""" | |
| step_header(7, "Set Bridge Mode") | |
| if self.conn_type == "4": | |
| info("Already in bridge mode (ConnType=4) — skipping") | |
| return True | |
| if self.dry_run: | |
| info("[DRY RUN] Would set WANC: ConnType=4, IsNAT=0, WANCName=1_INTERNET_B_VID_, ServList=1") | |
| return True | |
| tn = self._tn() | |
| info("Setting bridge mode (ConnType=4)...") | |
| out = tn.run( | |
| "sendcmd 1 DB set WANC 0 ConnType 4", | |
| "sendcmd 1 DB set WANC 0 IsNAT 0", | |
| "sendcmd 1 DB set WANC 0 WANCName 1_INTERNET_B_VID_", | |
| "sendcmd 1 DB set WANC 0 ServList 1", | |
| "sendcmd 1 DB save", | |
| ) | |
| for line in out.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| print(f" {stripped}") | |
| info("Verifying...") | |
| out = tn.run("sendcmd 1 DB p WANC") | |
| rows = parse_sendcmd_xml(out) | |
| if rows and rows[0].get("ConnType") == "4": | |
| info("Bridge mode set successfully (ConnType=4)") | |
| return True | |
| error("Failed to verify ConnType=4") | |
| return False | |
| # ── Step 8: Configure PortBinding ───────────────────────── | |
| def step8_port_binding(self) -> bool: | |
| """Configure PortBinding entries for LAN→WAN forwarding.""" | |
| step_header(8, "Configure PortBinding") | |
| has_eth1 = any( | |
| "WCD1" in r.get("WANViewName", "") and "ETH1" in r.get("LANViewName", "") | |
| for r in self.port_binding_rows) | |
| has_eth2 = any( | |
| "WCD1" in r.get("WANViewName", "") and "ETH2" in r.get("LANViewName", "") | |
| for r in self.port_binding_rows) | |
| if has_eth1 and has_eth2: | |
| info("PortBinding already has ETH1+ETH2 → WCD1 entries — skipping") | |
| return True | |
| if self.dry_run: | |
| info("[DRY RUN] Would create PortBinding entries:") | |
| info(" IGD.Binding1: IGD.WD1.WCD1.WCPPP1 <-> IGD.LD1.ETH1") | |
| info(" IGD.Binding2: IGD.WD1.WCD1.WCPPP1 <-> IGD.LD1.ETH2") | |
| return True | |
| tn = self._tn() | |
| existing_count = len(self.port_binding_rows) | |
| if existing_count > 0: | |
| info(f"Clearing {existing_count} existing PortBinding entries...") | |
| for i in range(existing_count - 1, -1, -1): | |
| tn.run(f"sendcmd 1 DB delr PortBinding {i}") | |
| info("Creating PortBinding entries...") | |
| out = tn.run( | |
| "sendcmd 1 DB addr PortBinding", | |
| "sendcmd 1 DB set PortBinding 0 ViewName IGD.Binding1", | |
| "sendcmd 1 DB set PortBinding 0 WANViewName IGD.WD1.WCD1.WCPPP1", | |
| "sendcmd 1 DB set PortBinding 0 LANViewName IGD.LD1.ETH1", | |
| "sendcmd 1 DB addr PortBinding", | |
| "sendcmd 1 DB set PortBinding 1 ViewName IGD.Binding2", | |
| "sendcmd 1 DB set PortBinding 1 WANViewName IGD.WD1.WCD1.WCPPP1", | |
| "sendcmd 1 DB set PortBinding 1 LANViewName IGD.LD1.ETH2", | |
| "sendcmd 1 DB save", | |
| ) | |
| for line in out.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| print(f" {stripped}") | |
| info("Verifying...") | |
| out = tn.run("sendcmd 1 DB p PortBinding") | |
| rows = parse_sendcmd_xml(out) | |
| if len(rows) >= 2: | |
| info(f"PortBinding has {len(rows)} entries:") | |
| for r in rows: | |
| print(f" {r.get('WANViewName', '?')} <-> {r.get('LANViewName', '?')}") | |
| else: | |
| warn(f"Expected 2 PortBinding entries, got {len(rows)}") | |
| return True | |
| # ── Step 9: Inject Persistent MAC Fix ───────────────────── | |
| def step9_inject_mac_fix(self) -> bool: | |
| """Inject persistent boot script to fix the eth0/nbif0 MAC collision.""" | |
| step_header(9, "Inject Persistent MAC Fix Boot Script") | |
| if not self.wan_mac: | |
| error("WAN MAC not known — cannot create MAC fix script") | |
| return False | |
| if not self.base_mac: | |
| self.base_mac = mac_decrement(self.wan_mac) | |
| wan_mac_lower = self.wan_mac.lower() | |
| base_mac_lower = self.base_mac.lower() | |
| info(f"WAN MAC (nbif0): {wan_mac_lower}") | |
| info(f"Fix MAC (eth0/br0): {base_mac_lower}") | |
| fix_script = f"""#!/bin/sh | |
| # fix-eth0-mac.sh — Fix MAC collision between eth0 and nbif0 | |
| # Generated by excitel-bridge-setup.py on {datetime.now().isoformat()} | |
| # | |
| # Problem: eth0 and nbif0 share MAC {wan_mac_lower}. The kernel bridge | |
| # merges their FDB entries, causing frames to loop back on nbif0. | |
| # Fix: Set eth0+br0 to {base_mac_lower} (WAN MAC - 1). | |
| LOG="/var/tmp/fix-mac-boot.log" | |
| echo "$(date) fix-eth0-mac.sh starting" >> "$LOG" | |
| # Wait for nbif0 to join br0 | |
| MAX_WAIT=240 | |
| WAITED=0 | |
| while [ $WAITED -lt $MAX_WAIT ]; do | |
| if brctl show br0 2>/dev/null | grep -q nbif0; then | |
| echo "$(date) nbif0 joined br0 after ${{WAITED}}s" >> "$LOG" | |
| break | |
| fi | |
| sleep 2 | |
| WAITED=$((WAITED + 2)) | |
| done | |
| if [ $WAITED -ge $MAX_WAIT ]; then | |
| echo "$(date) TIMEOUT waiting for nbif0 in br0" >> "$LOG" | |
| exit 1 | |
| fi | |
| ip link set dev eth0 address {base_mac_lower} | |
| ip link set dev br0 address {base_mac_lower} | |
| echo "$(date) Set eth0+br0 MAC to {base_mac_lower}" >> "$LOG" | |
| bridge fdb del {wan_mac_lower} dev nbif0 master 2>/dev/null | |
| echo "$(date) Deleted FDB entry for {wan_mac_lower} on nbif0" >> "$LOG" | |
| echo 0 > /proc/sys/net/bridge/bridge-nf-call-iptables 2>/dev/null | |
| echo "$(date) Disabled bridge-nf-call-iptables" >> "$LOG" | |
| echo "$(date) fix-eth0-mac.sh completed" >> "$LOG" | |
| """ | |
| boot_trigger = """#!/bin/sh | |
| # S98fixmac — Trigger the MAC fix script from /userconfig at boot | |
| /userconfig/fix-eth0-mac.sh & | |
| """ | |
| if self.dry_run: | |
| info("[DRY RUN] Would write /userconfig/fix-eth0-mac.sh:") | |
| for line in fix_script.splitlines()[:5]: | |
| print(f" {line}") | |
| print(f" ... ({len(fix_script.splitlines())} lines total)") | |
| info("[DRY RUN] Would mount mtdblock9 and write /etc/rcS.d/S98fixmac") | |
| return True | |
| tn = self._tn() | |
| info("Writing /userconfig/fix-eth0-mac.sh...") | |
| if not tn.write_file("/userconfig/fix-eth0-mac.sh", fix_script, 0o755): | |
| error("Failed to write fix script") | |
| return False | |
| verify = tn.run("ls -la /userconfig/fix-eth0-mac.sh") | |
| if "fix-eth0-mac.sh" in verify: | |
| info("Fix script written successfully") | |
| print(f" {verify.strip()}") | |
| else: | |
| error("Fix script verification failed") | |
| return False | |
| info("Mounting secondary rootfs (mtdblock9) to inject boot trigger...") | |
| tn.run( | |
| "mkdir -p /var/tmp/rootmnt", | |
| "mount -t jffs2 /dev/mtdblock9 /var/tmp/rootmnt", | |
| "mount -o remount,rw /var/tmp/rootmnt", | |
| ) | |
| info("Writing S98fixmac boot trigger...") | |
| if not tn.write_file("/var/tmp/rootmnt/etc/rcS.d/S98fixmac", | |
| boot_trigger, 0o755): | |
| error("Failed to write boot trigger") | |
| tn.run("umount /var/tmp/rootmnt 2>/dev/null") | |
| return False | |
| verify = tn.run("ls -la /var/tmp/rootmnt/etc/rcS.d/S98fixmac") | |
| if "S98fixmac" in verify: | |
| info("Boot trigger written successfully") | |
| print(f" {verify.strip()}") | |
| else: | |
| warn("Could not verify boot trigger — it may still work") | |
| tn.run("sync", "mount -o remount,ro /var/tmp/rootmnt") | |
| info("MAC fix boot script injected successfully") | |
| return True | |
| # ── Step 10: Reboot ─────────────────────────────────────── | |
| def step10_reboot(self) -> bool: | |
| """Reboot the ONT to apply all changes.""" | |
| step_header(10, "Reboot ONT") | |
| if self.no_reboot: | |
| info("Skipping reboot (--no-reboot flag)") | |
| info(f" Reboot manually: telnet {self.ont_ip} → manu → reboot") | |
| return True | |
| if self.dry_run: | |
| info("[DRY RUN] Would reboot the ONT and wait for it to come back") | |
| return True | |
| if not confirm("Reboot the ONT now? (it will be offline for ~90s)"): | |
| info("Skipping reboot — remember to reboot manually!") | |
| return True | |
| tn = self._tn() | |
| info("Sending reboot command...") | |
| tn.run("reboot") | |
| info("Waiting for ONT to go down...") | |
| time.sleep(5) | |
| info("Waiting for ONT to come back (up to 225s)...") | |
| if wait_for_port(self.ont_ip, 80, timeout=225, interval=5): | |
| info("ONT is back online!") | |
| else: | |
| warn("ONT did not respond within 225s — check manually") | |
| return True | |
| # ── Step 11: Show Router Config ─────────────────────────── | |
| def step11_show_config(self) -> bool: | |
| """Display downstream router configuration.""" | |
| step_header(11, "Downstream Router Configuration") | |
| print(f""" | |
| {B}Configure your downstream router with these settings:{N} | |
| {B}PPPoE Connection{N} | |
| {'─' * 40}""") | |
| kv("PPPoE Username", self.pppoe_user or "(not extracted — check backup files)") | |
| kv("PPPoE Password", self.pppoe_pass or "(not extracted — check backup files)") | |
| kv("MAC to Clone", self.wan_mac or "(not extracted)") | |
| kv("MTU", "1492") | |
| kv("MRU", "1492") | |
| kv("Service Name", "(leave blank / auto)") | |
| kv("Auth Type", "PAP/CHAP/MS-CHAP (auto)") | |
| print(f""" | |
| {B}IPv6{N} | |
| {'─' * 40}""") | |
| kv("IPv6 Mode", "DHCPv6 (Prefix Delegation)") | |
| kv("Prefix Hint", "/64") | |
| kv("Note", "Excitel delegates a /64 via DHCPv6 PD") | |
| print(f""" | |
| {B}Important Notes{N} | |
| {'─' * 40} | |
| - You MUST clone the WAN MAC ({self.wan_mac}) on your router's WAN interface. | |
| Without this, Excitel's OLT will not forward traffic to your router. | |
| - MTU 1492 = 1500 - 8 bytes PPPoE overhead. | |
| - After connecting PPPoE, verify with: ping -s 1464 -M do 8.8.8.8 | |
| (1464 + 20 IP + 8 ICMP = 1492 = MTU, should not fragment) | |
| """) | |
| if self.wan_mac: | |
| print(f""" {B}UniFi Dream Machine Users{N} | |
| {'─' * 40} | |
| - MAC Clone: Settings → Internet → Primary (WAN1) → Advanced → MAC Clone | |
| Set to: {self.wan_mac} | |
| - To access ONT after bridge mode: add a secondary IP on the UDM LAN: | |
| Settings → Networks → Default → Add static IP: 192.168.0.2/24 | |
| Then browse to http://{self.ont_ip} | |
| """) | |
| if not self.dry_run and not self.no_reboot and port_open(self.ont_ip, 23, timeout=3): | |
| info("Checking MAC fix status on ONT...") | |
| tn = self._tn() | |
| log = tn.run("cat /var/tmp/fix-mac-boot.log 2>/dev/null") | |
| if "completed" in log: | |
| info("MAC fix boot script ran successfully!") | |
| for line in log.strip().splitlines(): | |
| print(f" {line}") | |
| elif log.strip(): | |
| warn("MAC fix log exists but may not have completed:") | |
| for line in log.strip().splitlines(): | |
| print(f" {line}") | |
| else: | |
| warn("MAC fix log not found yet — check in a minute") | |
| return True | |
| # ── Revert Steps ───────────────────────────────────────── | |
| def _decrypt_bin_backup(self, path: Path) -> str: | |
| """Decrypt an encrypted .bin config backup to XML string.""" | |
| if not _HAS_AES: | |
| warn("pycryptodome not installed — cannot decrypt .bin backup") | |
| return "" | |
| try: | |
| raw = path.read_bytes() | |
| xml_bytes = decrypt_web_backup(raw) | |
| xml_str = xml_bytes.decode("utf-8", errors="replace") | |
| info(f"Decrypted {path.name}: {len(xml_str)} chars") | |
| return xml_str | |
| except Exception as e: | |
| warn(f"Failed to decrypt {path.name}: {e}") | |
| return "" | |
| def _load_backup_values(self) -> dict[str, str]: | |
| """Load original ONT values from a backup file. | |
| Searches only in the script's own backup directory (backup_dir), | |
| which is where step 5 saves backups before making changes. | |
| Order: | |
| 1. Explicit --backup-file path (XML or .bin) | |
| 2. Most recent ont_backup_*_full.xml in backup_dir | |
| 3. Excitel route-mode defaults | |
| """ | |
| xml_content = "" | |
| # Try explicit backup file | |
| if self.backup_file: | |
| p = Path(self.backup_file) | |
| if p.exists(): | |
| info(f"Loading backup from: {p}") | |
| if p.suffix == ".bin": | |
| xml_content = self._decrypt_bin_backup(p) | |
| else: | |
| xml_content = p.read_text(encoding="utf-8", errors="replace") | |
| else: | |
| warn(f"Backup file not found: {p}") | |
| # Auto-detect from backup_dir only (where step 5 saves) | |
| if not xml_content: | |
| files = sorted( | |
| _glob.glob(str(self.backup_dir / "ont_backup_*_full.xml")), | |
| key=lambda f: Path(f).stat().st_mtime, | |
| reverse=True, | |
| ) | |
| if files: | |
| info(f"Found backup: {files[0]}") | |
| xml_content = Path(files[0]).read_text( | |
| encoding="utf-8", errors="replace") | |
| if not xml_content: | |
| warn(f"No backup file found in {self.backup_dir}") | |
| warn("Use --backup-file to specify one, or Excitel defaults will be used") | |
| return dict(self.EXCITEL_ROUTE_DEFAULTS) | |
| vals: dict[str, str] = {} | |
| # Extract WANC values | |
| wanc_rows = parse_config_xml_table(xml_content, "WANC") | |
| if wanc_rows: | |
| r = wanc_rows[0] | |
| vals["ConnType"] = r.get("ConnType", "2") | |
| vals["IsNAT"] = r.get("IsNAT", "1") | |
| vals["WANCName"] = r.get("WANCName", "1_INTERNET_R_VID_") | |
| vals["ServList"] = r.get("ServList", "3") | |
| # Extract MgtServer values | |
| mgt_rows = parse_config_xml_table(xml_content, "MgtServer") | |
| if mgt_rows: | |
| r = mgt_rows[0] | |
| vals["Tr069Enable"] = r.get("Tr069Enable", "1") | |
| vals["PeriodicInformEnable"] = r.get("PeriodicInformEnable", "1") | |
| vals["STUNEnable"] = r.get("STUNEnable", "1") | |
| # Fill missing values with defaults | |
| for k, v in self.EXCITEL_ROUTE_DEFAULTS.items(): | |
| vals.setdefault(k, v) | |
| return vals | |
| def revert_step4_read_and_plan(self) -> bool: | |
| """Read current config and determine what needs reverting.""" | |
| step_header(4, "Read Current Config & Load Original Values") | |
| if self.dry_run: | |
| info("[DRY RUN] Would read current config and load backup values") | |
| self._original_values = dict(self.EXCITEL_ROUTE_DEFAULTS) | |
| return True | |
| tn = self._tn() | |
| # Read current WANC state | |
| info("Reading current WANC...") | |
| out = tn.run("sendcmd 1 DB p WANC") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| r = rows[0] | |
| self.conn_type = r.get("ConnType", "") | |
| self.wan_name = r.get("WANCName", "") | |
| self.serv_list = r.get("ServList", "") | |
| self.is_nat = r.get("IsNAT", "") | |
| self.wan_mac = r.get("WorkIFMac", "") | |
| ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"} | |
| kv("ConnType (current)", f"{self.conn_type} ({ct_names.get(self.conn_type, '?')})") | |
| kv("WANCName (current)", self.wan_name) | |
| kv("ServList (current)", self.serv_list) | |
| kv("IsNAT (current)", self.is_nat) | |
| kv("WAN MAC", self.wan_mac) | |
| if self.wan_mac: | |
| self.base_mac = mac_decrement(self.wan_mac) | |
| # Read current MgtServer state | |
| info("Reading current MgtServer (TR-069)...") | |
| out = tn.run("sendcmd 1 DB p MgtServer") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| self.tr069_enabled = rows[0].get("Tr069Enable", "") | |
| self.periodic_inform = rows[0].get("PeriodicInformEnable", "") | |
| self.stun_enabled = rows[0].get("STUNEnable", "") | |
| kv("TR-069 (current)", self.tr069_enabled) | |
| kv("Periodic Inform (current)", self.periodic_inform) | |
| kv("STUN (current)", self.stun_enabled) | |
| # Read current PortBinding state | |
| info("Reading current PortBinding...") | |
| out = tn.run("sendcmd 1 DB p PortBinding") | |
| self.port_binding_rows = parse_sendcmd_xml(out) | |
| kv("PortBinding entries", str(len(self.port_binding_rows))) | |
| for r in self.port_binding_rows: | |
| print(f" {r.get('WANViewName', '?')} <-> {r.get('LANViewName', '?')}") | |
| # Check for MAC fix scripts | |
| info("Checking for MAC fix scripts...") | |
| out = tn.run("ls -la /userconfig/fix-eth0-mac.sh 2>/dev/null") | |
| has_fix_script = "fix-eth0-mac.sh" in out | |
| kv("fix-eth0-mac.sh", "present" if has_fix_script else "not found") | |
| out = tn.run( | |
| "mkdir -p /var/tmp/rootmnt", | |
| "mount -t jffs2 /dev/mtdblock9 /var/tmp/rootmnt 2>/dev/null", | |
| "ls -la /var/tmp/rootmnt/etc/rcS.d/S98fixmac 2>/dev/null", | |
| ) | |
| has_boot_trigger = "S98fixmac" in out | |
| kv("S98fixmac", "present" if has_boot_trigger else "not found") | |
| tn.run("umount /var/tmp/rootmnt 2>/dev/null") | |
| # Load original values | |
| self._original_values = self._load_backup_values() | |
| # Show what will change | |
| print(f"\n{B} Planned revert actions:{N}") | |
| changes: list[str] = [] | |
| orig_ct = self._original_values.get("ConnType", "2") | |
| if self.conn_type != orig_ct: | |
| ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"} | |
| changes.append( | |
| f"ConnType: {self.conn_type} ({ct_names.get(self.conn_type, '?')}) " | |
| f"-> {orig_ct} ({ct_names.get(orig_ct, '?')})") | |
| orig_nat = self._original_values.get("IsNAT", "1") | |
| if self.is_nat != orig_nat: | |
| changes.append(f"IsNAT: {self.is_nat} -> {orig_nat}") | |
| orig_name = self._original_values.get("WANCName", "") | |
| if orig_name and self.wan_name != orig_name: | |
| changes.append(f"WANCName: {self.wan_name} -> {orig_name}") | |
| orig_sl = self._original_values.get("ServList", "3") | |
| if self.serv_list != orig_sl: | |
| changes.append(f"ServList: {self.serv_list} -> {orig_sl}") | |
| if self.tr069_enabled != self._original_values.get("Tr069Enable", "1"): | |
| changes.append(f"Tr069Enable: {self.tr069_enabled} -> {self._original_values['Tr069Enable']}") | |
| if self.periodic_inform != self._original_values.get("PeriodicInformEnable", "1"): | |
| changes.append(f"PeriodicInformEnable: {self.periodic_inform} -> {self._original_values['PeriodicInformEnable']}") | |
| if self.stun_enabled != self._original_values.get("STUNEnable", "1"): | |
| changes.append(f"STUNEnable: {self.stun_enabled} -> {self._original_values['STUNEnable']}") | |
| if self.port_binding_rows: | |
| changes.append(f"PortBinding: remove {len(self.port_binding_rows)} entries") | |
| if has_fix_script: | |
| changes.append("Remove /userconfig/fix-eth0-mac.sh") | |
| if has_boot_trigger: | |
| changes.append("Remove /etc/rcS.d/S98fixmac from rootfs overlay") | |
| if not changes: | |
| info("Nothing to revert — ONT appears to be in factory state already") | |
| return False | |
| for ch in changes: | |
| print(f" - {ch}") | |
| print() | |
| if not confirm("Proceed with revert?"): | |
| info("Aborted by user") | |
| return False | |
| return True | |
| def revert_step5_remove_mac_fix(self) -> bool: | |
| """Remove the MAC fix boot scripts from the ONT. | |
| Removes: | |
| - /userconfig/fix-eth0-mac.sh (jffs2 on mtdblock5, always writable) | |
| - /etc/rcS.d/S98fixmac on the secondary rootfs overlay (mtdblock9) | |
| """ | |
| step_header(5, "Remove MAC Fix Boot Scripts") | |
| if self.dry_run: | |
| info("[DRY RUN] Would remove /userconfig/fix-eth0-mac.sh") | |
| info("[DRY RUN] Would mount mtdblock9 and remove /etc/rcS.d/S98fixmac") | |
| return True | |
| tn = self._tn() | |
| # Remove the fix script from /userconfig | |
| info("Removing /userconfig/fix-eth0-mac.sh...") | |
| out = tn.run("rm -f /userconfig/fix-eth0-mac.sh") | |
| verify = tn.run("ls /userconfig/fix-eth0-mac.sh 2>&1") | |
| if "No such file" in verify or "fix-eth0-mac.sh" not in verify: | |
| info("fix-eth0-mac.sh removed") | |
| else: | |
| warn("fix-eth0-mac.sh may still exist — check manually") | |
| # Remove boot trigger from rootfs overlay | |
| info("Mounting secondary rootfs (mtdblock9)...") | |
| tn.run( | |
| "mkdir -p /var/tmp/rootmnt", | |
| "mount -t jffs2 /dev/mtdblock9 /var/tmp/rootmnt 2>/dev/null", | |
| "mount -o remount,rw /var/tmp/rootmnt", | |
| ) | |
| info("Removing S98fixmac boot trigger...") | |
| tn.run("rm -f /var/tmp/rootmnt/etc/rcS.d/S98fixmac") | |
| verify = tn.run("ls /var/tmp/rootmnt/etc/rcS.d/S98fixmac 2>&1") | |
| if "No such file" in verify or "S98fixmac" not in verify: | |
| info("S98fixmac removed") | |
| else: | |
| warn("S98fixmac may still exist — check manually") | |
| tn.run("sync", "mount -o remount,ro /var/tmp/rootmnt") | |
| # Clean up any leftover MAC fix log | |
| tn.run("rm -f /var/tmp/fix-mac-boot.log") | |
| info("MAC fix boot scripts removed") | |
| return True | |
| def revert_step6_remove_port_binding(self) -> bool: | |
| """Remove PortBinding entries added by the setup script. | |
| In factory route mode, the PortBinding table is empty (0 rows). | |
| Bridge mode added entries binding ETH1+ETH2 to WCD1.WCPPP1. | |
| """ | |
| step_header(6, "Remove PortBinding Entries") | |
| if not self.port_binding_rows: | |
| info("No PortBinding entries to remove — already empty") | |
| return True | |
| if self.dry_run: | |
| info(f"[DRY RUN] Would delete {len(self.port_binding_rows)} PortBinding entries") | |
| return True | |
| tn = self._tn() | |
| count = len(self.port_binding_rows) | |
| info(f"Deleting {count} PortBinding entries...") | |
| # Delete in reverse order to keep indices stable | |
| for i in range(count - 1, -1, -1): | |
| tn.run(f"sendcmd 1 DB delr PortBinding {i}") | |
| tn.run("sendcmd 1 DB save") | |
| info("Verifying...") | |
| out = tn.run("sendcmd 1 DB p PortBinding") | |
| rows = parse_sendcmd_xml(out) | |
| if len(rows) == 0: | |
| info("PortBinding table is now empty") | |
| else: | |
| warn(f"PortBinding still has {len(rows)} entries — may need manual cleanup") | |
| return True | |
| def revert_step7_restore_route_mode(self) -> bool: | |
| """Restore WAN connection to route/PPPoE mode (ConnType=2). | |
| Reverses step 7 of the setup. Restores: | |
| - ConnType: 4 (bridge) -> 2 (route/PPPoE) | |
| - IsNAT: 0 -> 1 | |
| - WANCName: 1_INTERNET_B_VID_ -> 1_INTERNET_R_VID_ | |
| - ServList: 1 -> 3 (INTERNET + TR069) | |
| """ | |
| step_header(7, "Restore Route Mode") | |
| orig = self._original_values | |
| ct = orig.get("ConnType", "2") | |
| nat = orig.get("IsNAT", "1") | |
| name = orig.get("WANCName", "1_INTERNET_R_VID_") | |
| sl = orig.get("ServList", "3") | |
| # Check if already in the target state | |
| if (self.conn_type == ct and self.is_nat == nat | |
| and self.wan_name == name and self.serv_list == sl): | |
| info("WAN connection already in target state — skipping") | |
| return True | |
| if self.dry_run: | |
| info(f"[DRY RUN] Would set WANC: ConnType={ct}, IsNAT={nat}, " | |
| f"WANCName={name}, ServList={sl}") | |
| return True | |
| tn = self._tn() | |
| info(f"Restoring route mode (ConnType={ct})...") | |
| out = tn.run( | |
| f"sendcmd 1 DB set WANC 0 ConnType {ct}", | |
| f"sendcmd 1 DB set WANC 0 IsNAT {nat}", | |
| f"sendcmd 1 DB set WANC 0 WANCName {name}", | |
| f"sendcmd 1 DB set WANC 0 ServList {sl}", | |
| "sendcmd 1 DB save", | |
| ) | |
| for line in out.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| print(f" {stripped}") | |
| info("Verifying...") | |
| out = tn.run("sendcmd 1 DB p WANC") | |
| rows = parse_sendcmd_xml(out) | |
| if rows and rows[0].get("ConnType") == ct: | |
| ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"} | |
| info(f"Route mode restored (ConnType={ct}, {ct_names.get(ct, '?')})") | |
| return True | |
| error(f"Failed to verify ConnType={ct}") | |
| return False | |
| def revert_step8_restore_tr069(self) -> bool: | |
| """Re-enable TR-069 management. | |
| Reverses step 6 of the setup. The ISP uses TR-069 (CWMP) to manage | |
| the ONT remotely. Re-enabling it restores normal ISP management. | |
| """ | |
| step_header(8, "Re-enable TR-069") | |
| orig = self._original_values | |
| tr069 = orig.get("Tr069Enable", "1") | |
| periodic = orig.get("PeriodicInformEnable", "1") | |
| stun = orig.get("STUNEnable", "1") | |
| if (self.tr069_enabled == tr069 and self.periodic_inform == periodic | |
| and self.stun_enabled == stun): | |
| info("TR-069 settings already in target state — skipping") | |
| return True | |
| if self.dry_run: | |
| info(f"[DRY RUN] Would set MgtServer: Tr069Enable={tr069}, " | |
| f"PeriodicInformEnable={periodic}, STUNEnable={stun}") | |
| return True | |
| tn = self._tn() | |
| info("Re-enabling TR-069...") | |
| out = tn.run( | |
| f"sendcmd 1 DB set MgtServer 0 Tr069Enable {tr069}", | |
| f"sendcmd 1 DB set MgtServer 0 PeriodicInformEnable {periodic}", | |
| f"sendcmd 1 DB set MgtServer 0 STUNEnable {stun}", | |
| "sendcmd 1 DB save", | |
| ) | |
| for line in out.splitlines(): | |
| stripped = line.strip() | |
| if stripped: | |
| print(f" {stripped}") | |
| info("Verifying...") | |
| out = tn.run("sendcmd 1 DB p MgtServer") | |
| rows = parse_sendcmd_xml(out) | |
| if rows: | |
| ok = (rows[0].get("Tr069Enable") == tr069 and | |
| rows[0].get("PeriodicInformEnable") == periodic and | |
| rows[0].get("STUNEnable") == stun) | |
| if ok: | |
| info("TR-069 re-enabled successfully") | |
| else: | |
| warn("Some TR-069 fields may not have been set correctly") | |
| return True | |
| def revert_step9_reboot(self) -> bool: | |
| """Reboot the ONT to apply the reverted configuration.""" | |
| step_header(9, "Reboot ONT") | |
| if self.no_reboot: | |
| info("Skipping reboot (--no-reboot flag)") | |
| info(f" Reboot manually: telnet {self.ont_ip} → manu → reboot") | |
| return True | |
| if self.dry_run: | |
| info("[DRY RUN] Would reboot the ONT and wait for it to come back") | |
| return True | |
| if not confirm("Reboot the ONT now? (it will be offline for ~90s)"): | |
| info("Skipping reboot — remember to reboot manually!") | |
| return True | |
| tn = self._tn() | |
| info("Sending reboot command...") | |
| tn.run("reboot") | |
| info("Waiting for ONT to go down...") | |
| time.sleep(5) | |
| info("Waiting for ONT to come back (up to 225s)...") | |
| if wait_for_port(self.ont_ip, 80, timeout=225, interval=5): | |
| info("ONT is back online!") | |
| else: | |
| warn("ONT did not respond within 225s — check manually") | |
| return True | |
| def run_revert(self) -> bool: | |
| """Run the full revert process to undo bridge mode changes.""" | |
| header("Excitel ONT Bridge Mode — REVERT") | |
| print(f" ONT IP: {self.ont_ip}") | |
| print(f" Dry Run: {self.dry_run}") | |
| print(f" No Reboot: {self.no_reboot}") | |
| if self.backup_file: | |
| print(f" Backup File: {self.backup_file}") | |
| if not _HAS_AES: | |
| warn("pycryptodome not installed — config backup decrypt will be unavailable") | |
| if self.dry_run: | |
| print(f"\n {Y}DRY RUN MODE — no changes will be made{N}") | |
| print(f"\n {R}This will revert ALL bridge mode changes and restore the ONT") | |
| print(f" to factory route mode. The ONT will handle PPPoE directly.{N}") | |
| revert_steps: list[tuple[int, str, callable]] = [ | |
| (1, "Web UI Login", self.step1_web_login), | |
| (2, "Enable Telnet", self.step2_enable_telnet), | |
| (3, "Derive manu Credentials", self.step3_derive_manu), | |
| (4, "Read Config & Plan Revert", self.revert_step4_read_and_plan), | |
| (5, "Remove MAC Fix Scripts", self.revert_step5_remove_mac_fix), | |
| (6, "Remove PortBinding", self.revert_step6_remove_port_binding), | |
| (7, "Restore Route Mode", self.revert_step7_restore_route_mode), | |
| (8, "Re-enable TR-069", self.revert_step8_restore_tr069), | |
| (9, "Reboot", self.revert_step9_reboot), | |
| ] | |
| for num, name, func in revert_steps: | |
| if not func(): | |
| error(f"Revert step {num} ({name}) failed — aborting") | |
| return False | |
| header("Revert Complete!") | |
| if self.dry_run: | |
| info("This was a dry run — no changes were made") | |
| info("Run without --dry-run to apply revert") | |
| else: | |
| info("Your ONT has been reverted to route mode") | |
| info("The ONT will now handle PPPoE directly (no downstream router needed)") | |
| info("If you had a downstream router doing PPPoE, disconnect it and") | |
| info("connect your devices directly to the ONT's LAN ports") | |
| return True | |
| # ── Main Run ────────────────────────────────────────────── | |
| def run(self) -> bool: | |
| header("Excitel ONT Bridge Mode Auto-Setup") | |
| print(f" ONT IP: {self.ont_ip}") | |
| print(f" Dry Run: {self.dry_run}") | |
| print(f" Skip To: Step {self.skip_to}") | |
| print(f" No Reboot: {self.no_reboot}") | |
| if not _HAS_AES: | |
| warn("pycryptodome not installed — config backup decrypt will be unavailable") | |
| warn("Install with: pip install pycryptodome") | |
| if self.dry_run: | |
| print(f"\n {Y}DRY RUN MODE — no changes will be made{N}") | |
| steps: list[tuple[int, str, callable]] = [ | |
| (1, "Web UI Login", self.step1_web_login), | |
| (2, "Enable Telnet", self.step2_enable_telnet), | |
| (3, "Derive manu Credentials", self.step3_derive_manu), | |
| (4, "Extract Config", self.step4_extract_config), | |
| (5, "Backup Config", self.step5_backup), | |
| (6, "Disable TR-069", self.step6_disable_tr069), | |
| (7, "Set Bridge Mode", self.step7_set_bridge), | |
| (8, "Configure PortBinding", self.step8_port_binding), | |
| (9, "Inject MAC Fix Script", self.step9_inject_mac_fix), | |
| (10, "Reboot", self.step10_reboot), | |
| (11, "Show Router Config", self.step11_show_config), | |
| ] | |
| for num, name, func in steps: | |
| if not self.should_run(num): | |
| info(f"Skipping step {num}: {name}") | |
| continue | |
| if not func(): | |
| error(f"Step {num} ({name}) failed — aborting") | |
| return False | |
| header("Setup Complete!") | |
| if self.dry_run: | |
| info("This was a dry run — no changes were made") | |
| info("Run without --dry-run to apply changes") | |
| else: | |
| info("Your ONT is now in bridge mode") | |
| info("Configure your downstream router with the settings shown above") | |
| info("If anything goes wrong, restore from the backup files saved in Step 5") | |
| return True | |
| # ── CLI ─────────────────────────────────────────────────────── | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Excitel ONT Bridge Mode Auto-Setup for ZTE ZXIC 2K05X", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog="""\ | |
| Examples: | |
| %(prog)s Full automated setup | |
| %(prog)s --dry-run Preview without making changes | |
| %(prog)s --skip-to 6 Resume from step 6 | |
| %(prog)s --no-reboot Skip the reboot step | |
| %(prog)s --ont-ip 10.0.0.1 Custom ONT IP address | |
| %(prog)s --revert Revert all changes back to route mode | |
| %(prog)s --revert --dry-run Preview what revert would do | |
| %(prog)s --revert --backup-file ~/ont_backup_full.xml | |
| """, | |
| ) | |
| parser.add_argument("--ont-ip", default="192.168.0.1", | |
| help="ONT IP address (default: 192.168.0.1)") | |
| parser.add_argument("--dry-run", action="store_true", | |
| help="Show what would happen without making changes") | |
| parser.add_argument("--skip-to", type=int, default=1, metavar="N", | |
| help="Resume from step N (1-11)") | |
| parser.add_argument("--no-reboot", action="store_true", | |
| help="Skip the reboot step (step 10)") | |
| parser.add_argument("--revert", action="store_true", | |
| help="Revert ALL changes — restore ONT to route mode") | |
| parser.add_argument("--backup-file", metavar="FILE", | |
| help="Decrypted XML backup file to restore original values from " | |
| "(auto-detected if not specified)") | |
| args = parser.parse_args() | |
| if not args.revert and args.skip_to < 1 or args.skip_to > 11: | |
| parser.error("--skip-to must be between 1 and 11") | |
| if args.backup_file and not args.revert: | |
| parser.error("--backup-file only makes sense with --revert") | |
| setup = ExcitelBridgeSetup( | |
| ont_ip=args.ont_ip, | |
| dry_run=args.dry_run, | |
| skip_to=args.skip_to, | |
| no_reboot=args.no_reboot, | |
| revert=args.revert, | |
| backup_file=args.backup_file, | |
| ) | |
| try: | |
| if args.revert: | |
| success = setup.run_revert() | |
| else: | |
| success = setup.run() | |
| except KeyboardInterrupt: | |
| print() | |
| error("Interrupted by user") | |
| success = False | |
| sys.exit(0 if success else 1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment