Skip to content

Instantly share code, notes, and snippets.

@ankurpandeyvns
Last active February 24, 2026 16:42
Show Gist options
  • Select an option

  • Save ankurpandeyvns/7df0bf41841d96afc82fe3ff97037fb5 to your computer and use it in GitHub Desktop.

Select an option

Save ankurpandeyvns/7df0bf41841d96afc82fe3ff97037fb5 to your computer and use it in GitHub Desktop.
Excitel ZTE ZXIC 2K05X ONT in bridge mode
#!/usr/bin/env python3
"""
Excitel ONT Bridge Mode Auto-Setup
===================================
Automates setting up (or reverting) the ZTE ZXIC 2K05X ONT in bridge mode
so a downstream router (e.g. UniFi Dream Machine) can handle PPPoE directly.
Architecture Overview
---------------------
In factory (route) mode the ONT runs its own PPPoE client and NATs traffic
through br0. Bridge mode (ConnType=4) makes the ONT a transparent L2 bridge:
PPPoE frames from the downstream router pass straight through to the OLT.
The ONT has a MAC collision bug: eth0 (LAN-facing) and nbif0 (WAN/PON-facing)
share the same MAC. The kernel bridge merges their FDB entries, so frames from
the ISP destined for the router get looped back up nbif0 instead of being
forwarded to eth0. We fix this by changing eth0+br0 to (WAN_MAC - 1) at boot
via a persistent script injected into the rootfs overlay on mtdblock9.
Prerequisites
-------------
- Python 3.10+
- pycryptodome: pip install pycryptodome
(used to AES-decrypt the ONT config backup for manu password extraction
and to read encrypted .bin backups during revert)
- Network access to the ONT (default 192.168.0.1)
- Web UI credentials (factory default: excitel / exc@123)
Usage
-----
Setup (bridge mode):
python3 excitel-bridge-setup.py # full automated setup
python3 excitel-bridge-setup.py --dry-run # preview without changes
python3 excitel-bridge-setup.py --skip-to 6 # resume from step 6
python3 excitel-bridge-setup.py --no-reboot # skip final reboot
python3 excitel-bridge-setup.py --ont-ip 10.0.0.1 # custom ONT IP
Revert (restore route mode):
python3 excitel-bridge-setup.py --revert # revert all changes
python3 excitel-bridge-setup.py --revert --dry-run # preview revert
python3 excitel-bridge-setup.py --revert --backup-file ~/ont_backup_full.xml
Setup Steps
-----------
1. Web UI Login
2. Enable Telnet via Web UI
3. Derive `manu` superuser credentials
- Method 1: Download config backup via web UI, decrypt, extract from XML
- Method 2: sismac via ISP telnet account (if shell not restricted)
- Method 3: Prompt user
4. Extract current config (PPPoE creds, WAN MAC, TR-069, PortBinding, etc.)
5. Backup config to local files (~/ont_backup_<timestamp>_full.xml + _tables.txt)
*** POINT OF NO RETURN — user must confirm before proceeding ***
6. Disable TR-069 management (prevents ISP from reverting changes)
7. Set bridge mode (ConnType=4, IsNAT=0, ServList=1)
8. Configure PortBinding (ETH1+ETH2 → WCD1.WCPPP1 for LAN→WAN forwarding)
9. Inject persistent MAC fix boot script
- /userconfig/fix-eth0-mac.sh (jffs2, persists across reboots)
- /etc/rcS.d/S98fixmac via rootfs overlay on mtdblock9
10. Reboot ONT
11. Display downstream router configuration (PPPoE creds, MAC to clone, etc.)
Revert Steps (--revert)
------------------------
Undoes ALL changes made by this script, restoring the ONT to factory route mode.
Original values are read from the backup file saved in step 5 (auto-detected
from ~/ont_backup_*_full.xml). If no backup is found, use --backup-file to
specify one (supports both decrypted .xml and encrypted .bin), or Excitel
factory defaults are used (ConnType=1, WANCName=1_TR069_INTERNET_R_VID_).
R1. Web UI Login (same as setup)
R2. Enable Telnet via Web UI (same as setup)
R3. Derive manu credentials (same as setup)
R4. Read current config, load original values from backup, show planned changes
R5. Remove MAC fix boot scripts (/userconfig/fix-eth0-mac.sh + S98fixmac)
R6. Remove PortBinding entries (restore to empty table)
R7. Restore route mode (ConnType, IsNAT, WANCName, ServList from backup)
R8. Re-enable TR-069 (Tr069Enable, PeriodicInformEnable, STUNEnable)
R9. Reboot ONT
"""
from __future__ import annotations
import argparse
import glob as _glob
import re
import select
import socket
import struct
import sys
import time
import urllib.parse
from datetime import datetime
from http.cookiejar import CookieJar
from pathlib import Path
from urllib.error import URLError
from urllib.request import HTTPCookieProcessor, Request, build_opener
from xml.etree import ElementTree as ET
try:
from Crypto.Cipher import AES
_HAS_AES = True
except ImportError:
_HAS_AES = False
# ── Colors (auto-disabled when not a TTY) ─────────────────────
_USE_COLOR = sys.stdout.isatty()
R = "\033[0;31m" if _USE_COLOR else ""
G = "\033[0;32m" if _USE_COLOR else ""
Y = "\033[1;33m" if _USE_COLOR else ""
C = "\033[0;36m" if _USE_COLOR else ""
B = "\033[1m" if _USE_COLOR else ""
N = "\033[0m" if _USE_COLOR else ""
def info(msg: str) -> None:
print(f"{G}[+]{N} {msg}")
def warn(msg: str) -> None:
print(f"{Y}[!]{N} {msg}")
def error(msg: str) -> None:
print(f"{R}[-]{N} {msg}")
def header(title: str) -> None:
print(f"\n{B}{'=' * 60}")
print(f" {title}")
print(f"{'=' * 60}{N}")
def kv(key: str, val: str, indent: int = 2) -> None:
print(f"{' ' * indent}{key + ':':<28s} {val}")
def step_header(num: int, title: str) -> None:
print(f"\n{B}{'─' * 60}")
print(f" Step {num}: {title}")
print(f"{'─' * 60}{N}")
def confirm(prompt: str) -> bool:
try:
answer = input(f"{Y}[?]{N} {prompt} (y/N): ").strip().lower()
return answer == "y"
except (EOFError, KeyboardInterrupt):
print()
return False
# ── ZTE Config Decryption ─────────────────────────────────────
# Ported from ppc_config_tool.py — decrypts web backup .bin files
# so we can extract manu password and PPPoE credentials without telnet.
_XOR_KEY = b'*&(*H65GFRUY6KH53%#74BUG^%^RFIOO*&*^&^RRU6YOK8PE(&(#TI_+7(U9(7!U(HF*(ET6FGHKDIO8E@67!R#@#'
_AES_KEY = bytes(16) # All zeros — yes, really
_CRC_PATTERN = bytes.fromhex('b1ca8498cc0d83389e5555c03011ecdc')
_ZTE_HEADER_MAGIC = bytes.fromhex('999999994444444455555555aaaaaaaa')
_PPCT_MAGIC = b'PPCT'
def _xor_data(data: bytes, key: bytes) -> bytes:
return bytes(byte ^ key[i % len(key)] for i, byte in enumerate(data))
def _remove_chunk_markers(data: bytes) -> bytes:
result = bytearray()
i = 0
while i < len(data):
if i + 12 <= len(data):
s1 = int.from_bytes(data[i:i + 4], 'big')
s2 = int.from_bytes(data[i + 4:i + 8], 'big')
if s1 == s2 and 100 < s1 < 100000:
i += 12
continue
result.append(data[i])
i += 1
return bytes(result)
def _remove_crc_blocks(data: bytes) -> bytes:
result = bytearray()
i = 0
while i < len(data):
if i + 16 <= len(data) and data[i:i + 16] == _CRC_PATTERN:
i += 16
else:
result.append(data[i])
i += 1
return bytes(result)
def decrypt_web_backup(data: bytes) -> bytes:
"""Decrypt a ZTE web backup .bin file to XML.
Structure: 128-byte ZTE header → XOR layer → PPCT header → AES-ECB layer.
Returns the <DB>...</DB> XML content.
"""
if not _HAS_AES:
raise RuntimeError("pycryptodome required: pip install pycryptodome")
if len(data) < 256 or data[:16] != _ZTE_HEADER_MAGIC:
raise ValueError("Not a valid ZTE backup file")
xor_dec = _xor_data(data[128:], _XOR_KEY)
if xor_dec[12:16] != _PPCT_MAGIC:
raise ValueError("Invalid PPCT header")
enc_prefix = xor_dec[0x58:0x78] # 32 bytes
enc_data = xor_dec[128:][8:] # Skip 8-byte CRC suffix
clean = _remove_crc_blocks(_remove_chunk_markers(enc_data))
full = enc_prefix + clean
aligned = len(full) - (len(full) % 16)
dec = AES.new(_AES_KEY, AES.MODE_ECB).decrypt(full[:aligned])
s = dec.find(b'<DB>')
e = dec.rfind(b'</DB>')
if s >= 0 and e >= 0:
return dec[s:e + 5]
return dec
# ── Telnet ────────────────────────────────────────────────────
class ONTTelnet:
"""Manage telnet sessions to the ONT via raw sockets."""
def __init__(self, host: str, user: str, passwd: str, timeout: int = 15):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = timeout
def _read_until(self, sock: socket.socket, marker: bytes,
timeout: float | None = None) -> bytes:
timeout = timeout or self.timeout
sock.settimeout(timeout)
buf = b""
deadline = time.time() + timeout
while time.time() < deadline:
try:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
if marker in buf:
break
except socket.timeout:
break
return buf
def _strip_telnet_negotiation(self, data: bytes) -> bytes:
result = b""
i = 0
while i < len(data):
if data[i:i + 1] == b"\xff" and i + 1 < len(data):
cmd = data[i + 1:i + 2]
if cmd in (b"\xfb", b"\xfc", b"\xfd", b"\xfe"):
i += 3
elif cmd == b"\xfa":
end = data.find(b"\xff\xf0", i)
i = end + 2 if end >= 0 else len(data)
else:
i += 2
else:
result += data[i:i + 1]
i += 1
return result
def _connect_and_login(self) -> socket.socket | None:
try:
sock = socket.create_connection((self.host, 23), timeout=self.timeout)
except Exception as e:
error(f"Telnet connect failed: {e}")
return None
data = self._read_until(sock, b"Login:")
self._strip_telnet_negotiation(data)
sock.sendall(self.user.encode() + b"\r\n")
self._read_until(sock, b"Password:")
sock.sendall(self.passwd.encode() + b"\r\n")
data = self._read_until(sock, b"/ #")
text = self._strip_telnet_negotiation(data).decode("utf-8", errors="replace")
if "Login incorrect" in text or "Login:" in text:
sock.close()
return None
return sock
def _parse_output(self, raw: bytes, cmd: str) -> list[str]:
raw = self._strip_telnet_negotiation(raw)
text = raw.decode("utf-8", errors="replace")
text = re.sub(r"\x1b\[[0-9;]*m", "", text)
lines = text.splitlines()
if lines and cmd.strip() in lines[0]:
lines = lines[1:]
if lines and "/ #" in lines[-1]:
last = lines[-1].replace("/ #", "").rstrip()
lines[-1:] = [last] if last else []
return lines
def run(self, *cmds: str) -> str:
sock = self._connect_and_login()
if not sock:
return ""
try:
output_lines: list[str] = []
for cmd in cmds:
sock.sendall(cmd.encode() + b"\r\n")
raw = self._read_until(sock, b"/ #")
output_lines.extend(self._parse_output(raw, cmd))
sock.sendall(b"exit\r\n")
sock.close()
return "\n".join(output_lines)
except Exception as e:
error(f"Telnet error: {e}")
try:
sock.close()
except Exception:
pass
return ""
def run_long(self, *cmds: str, timeout: float = 30) -> str:
sock = self._connect_and_login()
if not sock:
return ""
try:
output_lines: list[str] = []
for cmd in cmds:
sock.sendall(cmd.encode() + b"\r\n")
raw = self._read_until(sock, b"/ #", timeout=timeout)
output_lines.extend(self._parse_output(raw, cmd))
sock.sendall(b"exit\r\n")
sock.close()
return "\n".join(output_lines)
except Exception as e:
error(f"Telnet error: {e}")
try:
sock.close()
except Exception:
pass
return ""
def write_file(self, path: str, content: str, mode: int = 0o755) -> bool:
"""Write a file on the ONT via telnet heredoc with inter-line delay."""
sock = self._connect_and_login()
if not sock:
return False
try:
sock.sendall(f"cat > {path} << 'HEREDOC_EOF'\r\n".encode())
time.sleep(0.1)
for line in content.splitlines():
sock.sendall(line.encode() + b"\r\n")
time.sleep(0.02)
sock.sendall(b"HEREDOC_EOF\r\n")
self._read_until(sock, b"/ #", timeout=5)
sock.sendall(f"chmod {oct(mode)[2:]} {path}\r\n".encode())
self._read_until(sock, b"/ #", timeout=3)
sock.sendall(b"exit\r\n")
sock.close()
return True
except Exception as e:
error(f"Telnet write_file error: {e}")
try:
sock.close()
except Exception:
pass
return False
def check_login(self) -> bool:
sock = self._connect_and_login()
if sock:
try:
sock.sendall(b"exit\r\n")
sock.close()
except Exception:
pass
return True
return False
# ── HTTP / Web UI ─────────────────────────────────────────────
class ONTWeb:
"""Manage HTTP sessions to the ONT web UI.
The ONT only serves plain HTTP. If we get an SSL/HTTPS response,
we're hitting the wrong device (e.g. the downstream router).
"""
def __init__(self, base_url: str):
self.base_url = base_url
self.host = base_url.replace("http://", "").split("/")[0]
self.cookies = CookieJar()
self.opener = build_opener(HTTPCookieProcessor(self.cookies))
self.session_token = ""
self.logged_in = False
def _request(self, path: str, data: dict | None = None,
timeout: int = 10) -> str:
url = f"{self.base_url}/{path}" if path else self.base_url + "/"
try:
if data is not None:
encoded = urllib.parse.urlencode(data).encode()
req = Request(url, data=encoded)
else:
req = Request(url)
resp = self.opener.open(req, timeout=timeout)
if resp.url and resp.url.startswith("https://"):
error(f"Redirected to HTTPS ({resp.url}) — this is not the ONT")
return ""
return resp.read().decode("utf-8", errors="replace")
except URLError as e:
if "SSL" in str(e) or "certificate" in str(e).lower():
error("Got SSL/HTTPS response — this is not the ONT (ONT is HTTP only)")
else:
error(f"HTTP error: {e}")
return ""
def get(self, path: str) -> str:
return self._request(path)
def post(self, path: str, data: dict) -> str:
return self._request(path, data)
def get_page(self, page_name: str) -> str:
return self.get(f"getpage.gch?pid=1002&nextpage={page_name}")
def login(self, username: str, password: str) -> bool:
"""Login to web UI. Returns True on success."""
self.cookies.clear()
self.logged_in = False
html = self._request("")
if not html:
error("Cannot reach ONT web UI")
return False
m = re.search(r'Frm_Logintoken.*?value\s*=\s*"(\d*)"', html)
token = m.group(1) if m else "0"
if not token:
token = "0"
self.post("", {
"action": "login",
"Frm_Logintoken": token,
"Username": username,
"Password": password,
})
# Verify login by fetching a protected page
check = self.get_page("status_dev_info_t.gch")
if "logout_redirect" in check:
return False
self.logged_in = True
return True
def logout(self) -> None:
self.post("", {"logout": "1", "logout_from": "login_timeout"})
self.cookies.clear()
self.logged_in = False
def enable_telnet(self) -> bool:
"""Enable telnet on the ONT via the firewall/services page."""
html = self.get_page("sec_sc_t.gch")
if not html:
error("Could not fetch firewall settings page")
return False
m = re.search(r'session_token\s*=\s*"([^"]*)"', html)
if not m:
error("Could not extract session token from sec_sc_t.gch")
return False
self.session_token = m.group(1)
form_data = {
"_SESSION_TOKEN": self.session_token,
"IF_ACTION": "apply",
"IF_ERRORSTR": "SUCC",
"IF_ERRORPARAM": "SUCC",
"IF_ERRORTYPE": "-1286224680",
"ViewName": "NULL",
"Enable": "1",
"INCViewName": "IGD.LD1",
"INCName": "LAN",
"MinSrcIp": "192.168.0.1",
"MinSrcMask": "NULL",
"MaxSrcIp": "192.168.255.255",
"FilterTarget": "1",
"Servise": "10",
"ViewName0": "IGD.FWSc.FWSC1",
"Enable0": "1",
"INCViewName0": "IGD.WANIF",
"INCName0": "WAN",
"MinSrcIp0": "",
"MinSrcMask0": "0.0.0.0",
"MaxSrcIp0": "",
"FilterTarget0": "1",
"Servise0": "1",
"ViewName1": "IGD.FWSc.FWSC2",
"Enable1": "1",
"INCViewName1": "IGD.LD1",
"INCName1": "LAN",
"MinSrcIp1": "192.168.0.1",
"MinSrcMask1": "0.0.0.0",
"MaxSrcIp1": "192.168.255.255",
"FilterTarget1": "1",
"Servise1": "10",
"IF_INDEX": "1",
"IF_INSTNUM": "2",
}
resp = self.post("getpage.gch?pid=1002&nextpage=sec_sc_t.gch", form_data)
return bool(resp)
def download_config_backup(self) -> bytes:
"""Download encrypted config backup via the web UI.
Uses the manager_dev_config_t.gch page with raw socket for reliable
large-file download. Returns raw encrypted .bin bytes.
"""
# Get session token from backup page
page = self.get_page("manager_dev_config_t.gch")
m = re.search(r'session_token\s*=\s*"([^"]*)"', page)
if not m or not m.group(1):
raise RuntimeError("Could not get session token from backup page")
session_token = m.group(1)
cookie_str = "; ".join(f"{c.name}={c.value}" for c in self.cookies)
# Raw socket download (urllib can't handle the multipart POST reliably)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(180)
sock.connect((self.host, 80))
boundary = "----WebKitFormBoundary7MA4YWxkTrZu0gW"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="config"\r\n\r\n\r\n'
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="_SESSION_TOKEN"\r\n'
f"\r\n{session_token}\r\n"
f"--{boundary}--\r\n"
)
body_bytes = body.encode()
http_req = (
f"POST /getpage.gch?pid=101&nextpage=manager_dev_config_t.gch HTTP/1.1\r\n"
f"Host: {self.host}\r\n"
f"Content-Type: multipart/form-data; boundary={boundary}\r\n"
f"Content-Length: {len(body_bytes)}\r\n"
)
if cookie_str:
http_req += f"Cookie: {cookie_str}\r\n"
http_req += "Connection: keep-alive\r\n\r\n"
sock.sendall(http_req.encode() + body_bytes)
# Read HTTP response headers
resp_hdr = b""
while not resp_hdr.endswith(b"\r\n\r\n"):
c = sock.recv(1)
if not c:
raise RuntimeError("Connection closed during header read")
resp_hdr += c
expected = 0
for line in resp_hdr.decode().split("\r\n"):
if line.lower().startswith("content-length:"):
expected = int(line.split(":")[1].strip())
if expected == 0:
sock.close()
raise RuntimeError("No Content-Length in backup download response")
# Read body
content = b""
remaining = expected
while remaining > 0:
ready = select.select([sock], [], [], 60)
if ready[0]:
chunk = sock.recv(min(65536, remaining))
if not chunk:
break
content += chunk
remaining -= len(chunk)
else:
break
sock.close()
if len(content) != expected:
raise RuntimeError(
f"Download incomplete: got {len(content)}, expected {expected}")
if content[:16] != _ZTE_HEADER_MAGIC:
raise RuntimeError("Downloaded file is not a valid ZTE backup")
return content
# ── sendcmd XML parser ────────────────────────────────────────
def parse_sendcmd_xml(text: str) -> list[dict[str, str]]:
m = re.search(r"(<Tbl.*</Tbl>)", text, re.DOTALL)
if not m:
return []
try:
root = ET.fromstring(m.group(1))
except ET.ParseError:
return []
rows = []
for row in root.findall("Row"):
d = {}
for dm in row.findall("DM"):
d[dm.get("name", "")] = dm.get("val", "")
rows.append(d)
return rows
# ── Decrypted XML parser ─────────────────────────────────────
def parse_config_xml_table(xml_str: str, table_name: str) -> list[dict[str, str]]:
"""Extract rows from a named <Tbl> in the decrypted config XML."""
pattern = rf'<Tbl\s+name="{re.escape(table_name)}"[^>]*>(.*?)</Tbl>'
m = re.search(pattern, xml_str, re.DOTALL)
if not m:
return []
tbl_xml = m.group(0)
try:
root = ET.fromstring(tbl_xml)
except ET.ParseError:
return []
rows = []
for row in root.findall("Row"):
d = {}
for dm in row.findall("DM"):
d[dm.get("name", "")] = dm.get("val", "")
rows.append(d)
return rows
# ── Network Helpers ───────────────────────────────────────────
def port_open(host: str, port: int, timeout: float = 2) -> bool:
try:
sock = socket.create_connection((host, port), timeout=timeout)
sock.close()
return True
except (OSError, socket.timeout):
return False
def wait_for_port(host: str, port: int, timeout: float = 60,
interval: float = 5, want_open: bool = True) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if port_open(host, port) == want_open:
return True
remaining = deadline - time.time()
if remaining > 0:
time.sleep(min(interval, remaining))
return False
def mac_decrement(mac: str) -> str:
clean = mac.replace(":", "").replace("-", "")
num = int(clean, 16) - 1
hex_str = f"{num:012X}"
return ":".join(hex_str[i:i + 2] for i in range(0, 12, 2))
# ── Main Setup Class ─────────────────────────────────────────
class ExcitelBridgeSetup:
"""Orchestrates the full bridge mode setup across all 11 steps."""
DEFAULT_WEB_USER = "excitel"
DEFAULT_WEB_PASS = "exc@123"
ISP_TELNET_USER = "excitel"
ISP_TELNET_PASS = "3xCMon1t@123"
# Known Excitel route-mode defaults (used when no backup file available)
EXCITEL_ROUTE_DEFAULTS = {
"ConnType": "1", # Route/DHCP (Excitel factory default)
"IsNAT": "1", # NAT enabled
"WANCName": "1_TR069_INTERNET_R_VID_", # Factory name includes TR069
"ServList": "3", # INTERNET + TR069
"Tr069Enable": "1",
"PeriodicInformEnable": "1",
"STUNEnable": "1",
}
def __init__(self, ont_ip: str, dry_run: bool, skip_to: int,
no_reboot: bool, revert: bool = False,
backup_file: str | None = None):
self.ont_ip = ont_ip
self.ont_url = f"http://{ont_ip}"
self.dry_run = dry_run
self.skip_to = skip_to
self.no_reboot = no_reboot
self.revert = revert
self.backup_file = backup_file
self.web: ONTWeb | None = None
self.web_user = ""
self.web_pass = ""
self.manu_pass = ""
self.pppoe_user = ""
self.pppoe_pass = ""
self.wan_mac = ""
self.base_mac = ""
self.serial_number = ""
self.model_name = ""
self.conn_type = ""
self.wan_name = ""
self.serv_list = ""
self.is_nat = ""
self.tr069_enabled = ""
self.periodic_inform = ""
self.stun_enabled = ""
self.port_binding_rows: list[dict] = []
self.backup_dir = Path.home()
self._decrypted_xml: str = "" # Full decrypted config XML string
self._original_values: dict[str, str] = {} # Original values from backup
def _tn(self, user: str | None = None,
passwd: str | None = None) -> ONTTelnet:
if user and passwd:
return ONTTelnet(self.ont_ip, user, passwd)
if self.manu_pass:
return ONTTelnet(self.ont_ip, "manu", self.manu_pass)
return ONTTelnet(self.ont_ip, self.ISP_TELNET_USER,
self.ISP_TELNET_PASS)
def should_run(self, step: int) -> bool:
return step >= self.skip_to
# ── Step 1: Web UI Login ──────────────────────────────────
def step1_web_login(self) -> bool:
"""Login to the ONT web UI.
Tries factory default credentials first, then prompts.
If login fails due to a stale session (ONT only allows one),
instructs user to power-cycle or clears via telnet if available.
"""
step_header(1, "Web UI Login")
if not port_open(self.ont_ip, 80):
error(f"Cannot reach {self.ont_ip}:80 — is the ONT powered on and connected?")
return False
info(f"ONT web UI reachable at {self.ont_url}")
if self.dry_run:
info("[DRY RUN] Would attempt login with factory credentials")
self.web_user = self.DEFAULT_WEB_USER
self.web_pass = self.DEFAULT_WEB_PASS
self.web = ONTWeb(self.ont_url)
return True
self.web = ONTWeb(self.ont_url)
# Try factory defaults
info(f"Trying factory credentials: {self.DEFAULT_WEB_USER} / {self.DEFAULT_WEB_PASS}")
if self.web.login(self.DEFAULT_WEB_USER, self.DEFAULT_WEB_PASS):
info("Logged in with factory credentials")
self.web_user = self.DEFAULT_WEB_USER
self.web_pass = self.DEFAULT_WEB_PASS
return True
warn("Factory credentials failed")
# Prompt user
for attempt in range(1, 4):
print(f"\n Attempt {attempt}/3:")
try:
user = input(" Web username: ").strip()
passwd = input(" Web password: ").strip()
except (EOFError, KeyboardInterrupt):
print()
return False
if not user or not passwd:
warn("Empty username or password")
continue
self.web = ONTWeb(self.ont_url)
if self.web.login(user, passwd):
info(f"Logged in as {user}")
self.web_user = user
self.web_pass = passwd
return True
warn("Login failed, try again")
error("All login attempts exhausted")
return False
# ── Step 2: Enable Telnet ─────────────────────────────────
def step2_enable_telnet(self) -> bool:
"""Enable telnet via web UI firewall page. Skips if port 23 already open."""
step_header(2, "Enable Telnet via Web UI")
if port_open(self.ont_ip, 23):
info("Telnet port 23 already open — skipping")
return True
if self.dry_run:
info("[DRY RUN] Would POST firewall form to enable telnet")
return True
if not self.web or not self.web.logged_in:
error("Web UI not logged in — cannot enable telnet")
return False
info("Enabling telnet via firewall settings...")
if not self.web.enable_telnet():
error("Failed to POST telnet enable form")
return False
info("Waiting for telnet port to open...")
if wait_for_port(self.ont_ip, 23, timeout=8, interval=1):
info("Telnet is now available on port 23")
return True
error("Telnet port 23 did not open within 8 seconds")
return False
# ── Step 3: Derive manu Credentials ───────────────────────
def _try_config_backup_derive(self) -> str | None:
"""Download config backup via web UI, decrypt, extract manu password.
The decrypted XML contains TelnetUser and AuthUser tables with the
manu username and password in cleartext.
"""
if not _HAS_AES:
warn("pycryptodome not installed — cannot decrypt config backup")
warn("Install with: pip install pycryptodome")
return None
if not self.web or not self.web.logged_in:
return None
info("Downloading config backup via web UI...")
try:
raw = self.web.download_config_backup()
info(f"Downloaded {len(raw)} bytes, decrypting...")
xml_bytes = decrypt_web_backup(raw)
self._decrypted_xml = xml_bytes.decode("utf-8", errors="replace")
info(f"Decrypted config: {len(self._decrypted_xml)} chars")
except Exception as e:
warn(f"Config backup/decrypt failed: {e}")
return None
# Search TelnetUser table
rows = parse_config_xml_table(self._decrypted_xml, "TelnetUser")
for row in rows:
if row.get("Username") == "manu" and row.get("Password"):
return row["Password"]
# Search AuthUser table
rows = parse_config_xml_table(self._decrypted_xml, "AuthUser")
for row in rows:
if row.get("User") == "manu" and row.get("Pass"):
return row["Pass"]
# Brute-force search
m = re.search(
r'<DM\s+name="(?:User(?:name)?|User)"\s+val="manu"/>\s*'
r'<DM\s+name="(?:Pass(?:word)?|Pass)"\s+val="([^"]+)"',
self._decrypted_xml)
if m:
return m.group(1)
return None
def _try_sismac_derive(self) -> str | None:
"""Try to derive manu password via sismac on the ISP telnet account."""
info(f"Trying sismac via ISP account ({self.ISP_TELNET_USER})...")
tn_isp = ONTTelnet(self.ont_ip, self.ISP_TELNET_USER,
self.ISP_TELNET_PASS)
out = tn_isp.run("sismac 2 2194")
if not out or "Parse error" in out:
return None
info("sismac output:")
for line in out.splitlines():
print(f" {line}")
hex_bytes = re.findall(r"\b[0-9a-fA-F]{2}\b", out)
if not hex_bytes:
return None
ascii_str = ""
for hb in hex_bytes:
val = int(hb, 16)
if 0x20 <= val <= 0x7E:
ascii_str += chr(val)
return ascii_str[:8] if len(ascii_str) >= 8 else None
def _verify_manu(self, password: str) -> bool:
tn = ONTTelnet(self.ont_ip, "manu", password)
verify = tn.run("echo TELNET_OK")
return "TELNET_OK" in verify
def step3_derive_manu(self) -> bool:
"""Obtain the manu superuser password.
Tries three methods in order:
1. Download config backup via web UI → decrypt → extract from TelnetUser table
2. sismac via ISP telnet account (if the shell isn't restricted)
3. Prompt user
"""
step_header(3, "Derive manu Credentials & Verify Telnet")
if self.dry_run:
info("[DRY RUN] Would derive manu password via config backup or sismac")
self.manu_pass = "<derived>"
return True
# Method 1: Config backup decrypt (most reliable)
derived = self._try_config_backup_derive()
if derived:
info(f"Extracted manu password from config backup: {derived}")
if self._verify_manu(derived):
self.manu_pass = derived
info("manu login verified successfully")
return True
warn(f"Extracted password '{derived}' failed telnet verification")
# Method 2: sismac via ISP telnet account
derived = self._try_sismac_derive()
if derived:
info(f"Derived manu password via sismac: {derived}")
if self._verify_manu(derived):
self.manu_pass = derived
info("manu login verified successfully")
return True
warn(f"Derived password '{derived}' failed verification")
# Method 3: Prompt user
warn("Automatic manu password extraction failed")
info("Please provide the manu password manually.")
info("(Check ONT sticker, ISP records, or ask your ISP)")
for attempt in range(1, 4):
try:
passwd = input(f"\n manu password (attempt {attempt}/3): ").strip()
except (EOFError, KeyboardInterrupt):
print()
return False
if not passwd:
warn("Empty password")
continue
info(f"Testing manu / {passwd}...")
if self._verify_manu(passwd):
self.manu_pass = passwd
info("manu login verified successfully")
return True
warn("Login failed")
error("Could not obtain working manu credentials")
return False
# ── Step 4: Extract Current Config ────────────────────────
def step4_extract_config(self) -> bool:
"""Extract all relevant configuration from the ONT.
Uses the decrypted config XML (from step 3) if available, otherwise
falls back to telnet sendcmd queries.
"""
step_header(4, "Extract Current Configuration")
if self.dry_run:
info("[DRY RUN] Would read WANCPPP, WANC, DevInfo, MgtServer, PortBinding, BrGrp")
self.pppoe_user = "<pppoe_user>"
self.pppoe_pass = "<pppoe_pass>"
self.wan_mac = "A2:3E:D1:7B:44:F6"
self.base_mac = "A2:3E:D1:7B:44:F5"
return True
tn = self._tn()
# ── PPPoE credentials ──
# Try decrypted XML first (has cleartext password)
if self._decrypted_xml:
info("Extracting PPPoE credentials from decrypted config...")
ppp_rows = parse_config_xml_table(self._decrypted_xml, "WANCPPP")
if ppp_rows:
self.pppoe_user = ppp_rows[0].get("UserName", "")
self.pppoe_pass = ppp_rows[0].get("Password", "")
kv("PPPoE Username", self.pppoe_user)
kv("PPPoE Password", self.pppoe_pass)
else:
# Fallback: telnet
info("Reading WANCPPP via telnet...")
out = tn.run("sendcmd 1 DB p WANCPPP")
rows = parse_sendcmd_xml(out)
if rows:
self.pppoe_user = rows[0].get("UserName", "")
kv("PPPoE Username", self.pppoe_user)
info("Decrypting config via telnet for PPPoE password...")
tn.run("sendcmd 1 DB decry /userconfig/cfg/db_user_cfg.xml")
time.sleep(1)
decrypted = tn.run_long("cat /var/tmp/debug-decry-cfg", timeout=45)
tn.run("rm -f /var/tmp/debug-decry-cfg")
if decrypted:
self._decrypted_xml = decrypted
ppp_match = re.search(
r'<Tbl\s+name="WANCPPP"[^>]*>.*?'
r'<DM\s+name="Password"\s+val="([^"]*)"',
decrypted, re.DOTALL)
if ppp_match:
self.pppoe_pass = ppp_match.group(1)
kv("PPPoE Password", self.pppoe_pass)
# ── WANC (WAN connection settings) — always via telnet for live state ──
info("Reading WANC (WAN connection)...")
out = tn.run("sendcmd 1 DB p WANC")
rows = parse_sendcmd_xml(out)
if rows:
row0 = rows[0]
self.wan_mac = row0.get("WorkIFMac", "")
self.conn_type = row0.get("ConnType", "")
self.wan_name = row0.get("WANCName", "")
self.serv_list = row0.get("ServList", "")
ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"}
kv("WAN MAC", self.wan_mac)
kv("ConnType", f"{self.conn_type} ({ct_names.get(self.conn_type, '?')})")
kv("WANCName", self.wan_name)
kv("ServList", self.serv_list)
if self.wan_mac:
self.base_mac = mac_decrement(self.wan_mac)
kv("Base MAC (WAN-1)", self.base_mac)
# ── DevInfo ──
info("Reading DevInfo...")
out = tn.run("sendcmd 1 DB p DevInfo")
rows = parse_sendcmd_xml(out)
if rows:
self.serial_number = rows[0].get("SerialNumber", "")
self.model_name = rows[0].get("ModelName", "")
kv("Serial Number", self.serial_number)
kv("Model", self.model_name)
# ── MgtServer (TR-069) ──
info("Reading MgtServer (TR-069)...")
out = tn.run("sendcmd 1 DB p MgtServer")
rows = parse_sendcmd_xml(out)
if rows:
self.tr069_enabled = rows[0].get("Tr069Enable", "")
self.periodic_inform = rows[0].get("PeriodicInformEnable", "")
self.stun_enabled = rows[0].get("STUNEnable", "")
kv("TR-069 Enable", self.tr069_enabled)
kv("Periodic Inform", self.periodic_inform)
kv("STUN Enable", self.stun_enabled)
# ── PortBinding ──
info("Reading PortBinding...")
out = tn.run("sendcmd 1 DB p PortBinding")
self.port_binding_rows = parse_sendcmd_xml(out)
kv("PortBinding entries", str(len(self.port_binding_rows)))
for r in self.port_binding_rows:
print(f" {r.get('WANViewName', '?')} <-> {r.get('LANViewName', '?')}")
# ── BrGrp ──
info("Reading BrGrp...")
out = tn.run("sendcmd 1 DB p BrGrp")
brgrp_rows = parse_sendcmd_xml(out)
kv("BrGrp entries", str(len(brgrp_rows)))
# ── Warnings ──
if self.conn_type == "4":
warn("ONT is ALREADY in bridge mode (ConnType=4)")
if not self.pppoe_user:
warn("PPPoE username not found — you'll need it for your router config")
if not self.pppoe_pass:
warn("PPPoE password not extracted — you'll need it for your router config")
return True
# ── Step 5: Backup Config ─────────────────────────────────
def step5_backup(self) -> bool:
"""Save config to local files. POINT OF NO RETURN gate."""
step_header(5, "Backup Configuration")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
if self.dry_run:
info(f"[DRY RUN] Would save backup to:")
info(f" {self.backup_dir}/ont_backup_{ts}_full.xml")
info(f" {self.backup_dir}/ont_backup_{ts}_tables.txt")
return True
tn = self._tn()
# Full decrypted XML
if self._decrypted_xml:
xml_path = self.backup_dir / f"ont_backup_{ts}_full.xml"
xml_path.write_text(self._decrypted_xml, encoding="utf-8")
info(f"Full config saved: {xml_path}")
else:
warn("No decrypted config available — skipping full XML backup")
# Key tables dump
tables_path = self.backup_dir / f"ont_backup_{ts}_tables.txt"
tables = ["WANC", "WANCPPP", "MgtServer", "PortBinding", "BrGrp", "DevInfo"]
lines: list[str] = []
for tbl in tables:
lines.append(f"\n{'=' * 40}")
lines.append(f"Table: {tbl}")
lines.append(f"{'=' * 40}")
out = tn.run(f"sendcmd 1 DB p {tbl}")
rows = parse_sendcmd_xml(out)
if rows:
for i, row in enumerate(rows):
lines.append(f" --- Row {i} ---")
for k, v in row.items():
lines.append(f" {k + ':':<28s} {v}")
else:
lines.append(" (no rows)")
tables_path.write_text("\n".join(lines), encoding="utf-8")
info(f"Tables dump saved: {tables_path}")
# ── POINT OF NO RETURN ──
print(f"\n{Y}{'=' * 60}")
print(f" POINT OF NO RETURN")
print(f" The next steps will modify the ONT configuration.")
print(f" Backups saved to: {self.backup_dir}")
print(f"{'=' * 60}{N}")
if not confirm("Proceed with bridge mode setup?"):
info("Aborted by user")
return False
return True
# ── Step 6: Disable TR-069 ────────────────────────────────
def step6_disable_tr069(self) -> bool:
"""Disable TR-069 so the ISP can't revert bridge mode."""
step_header(6, "Disable TR-069")
if self.tr069_enabled == "0" and self.periodic_inform == "0" and self.stun_enabled == "0":
info("TR-069 already disabled — skipping")
return True
if self.dry_run:
info("[DRY RUN] Would set MgtServer: Tr069Enable=0, PeriodicInformEnable=0, STUNEnable=0")
return True
tn = self._tn()
info("Disabling TR-069...")
out = tn.run(
"sendcmd 1 DB set MgtServer 0 Tr069Enable 0",
"sendcmd 1 DB set MgtServer 0 PeriodicInformEnable 0",
"sendcmd 1 DB set MgtServer 0 STUNEnable 0",
"sendcmd 1 DB save",
)
for line in out.splitlines():
stripped = line.strip()
if stripped:
print(f" {stripped}")
info("Verifying...")
out = tn.run("sendcmd 1 DB p MgtServer")
rows = parse_sendcmd_xml(out)
if rows:
ok = (rows[0].get("Tr069Enable") == "0" and
rows[0].get("PeriodicInformEnable") == "0" and
rows[0].get("STUNEnable") == "0")
if ok:
info("TR-069 disabled successfully")
else:
warn("Some TR-069 fields may not have been set correctly")
return True
# ── Step 7: Set Bridge Mode ───────────────────────────────
def step7_set_bridge(self) -> bool:
"""Set WAN connection to bridge/passthrough mode (ConnType=4)."""
step_header(7, "Set Bridge Mode")
if self.conn_type == "4":
info("Already in bridge mode (ConnType=4) — skipping")
return True
if self.dry_run:
info("[DRY RUN] Would set WANC: ConnType=4, IsNAT=0, WANCName=1_INTERNET_B_VID_, ServList=1")
return True
tn = self._tn()
info("Setting bridge mode (ConnType=4)...")
out = tn.run(
"sendcmd 1 DB set WANC 0 ConnType 4",
"sendcmd 1 DB set WANC 0 IsNAT 0",
"sendcmd 1 DB set WANC 0 WANCName 1_INTERNET_B_VID_",
"sendcmd 1 DB set WANC 0 ServList 1",
"sendcmd 1 DB save",
)
for line in out.splitlines():
stripped = line.strip()
if stripped:
print(f" {stripped}")
info("Verifying...")
out = tn.run("sendcmd 1 DB p WANC")
rows = parse_sendcmd_xml(out)
if rows and rows[0].get("ConnType") == "4":
info("Bridge mode set successfully (ConnType=4)")
return True
error("Failed to verify ConnType=4")
return False
# ── Step 8: Configure PortBinding ─────────────────────────
def step8_port_binding(self) -> bool:
"""Configure PortBinding entries for LAN→WAN forwarding."""
step_header(8, "Configure PortBinding")
has_eth1 = any(
"WCD1" in r.get("WANViewName", "") and "ETH1" in r.get("LANViewName", "")
for r in self.port_binding_rows)
has_eth2 = any(
"WCD1" in r.get("WANViewName", "") and "ETH2" in r.get("LANViewName", "")
for r in self.port_binding_rows)
if has_eth1 and has_eth2:
info("PortBinding already has ETH1+ETH2 → WCD1 entries — skipping")
return True
if self.dry_run:
info("[DRY RUN] Would create PortBinding entries:")
info(" IGD.Binding1: IGD.WD1.WCD1.WCPPP1 <-> IGD.LD1.ETH1")
info(" IGD.Binding2: IGD.WD1.WCD1.WCPPP1 <-> IGD.LD1.ETH2")
return True
tn = self._tn()
existing_count = len(self.port_binding_rows)
if existing_count > 0:
info(f"Clearing {existing_count} existing PortBinding entries...")
for i in range(existing_count - 1, -1, -1):
tn.run(f"sendcmd 1 DB delr PortBinding {i}")
info("Creating PortBinding entries...")
out = tn.run(
"sendcmd 1 DB addr PortBinding",
"sendcmd 1 DB set PortBinding 0 ViewName IGD.Binding1",
"sendcmd 1 DB set PortBinding 0 WANViewName IGD.WD1.WCD1.WCPPP1",
"sendcmd 1 DB set PortBinding 0 LANViewName IGD.LD1.ETH1",
"sendcmd 1 DB addr PortBinding",
"sendcmd 1 DB set PortBinding 1 ViewName IGD.Binding2",
"sendcmd 1 DB set PortBinding 1 WANViewName IGD.WD1.WCD1.WCPPP1",
"sendcmd 1 DB set PortBinding 1 LANViewName IGD.LD1.ETH2",
"sendcmd 1 DB save",
)
for line in out.splitlines():
stripped = line.strip()
if stripped:
print(f" {stripped}")
info("Verifying...")
out = tn.run("sendcmd 1 DB p PortBinding")
rows = parse_sendcmd_xml(out)
if len(rows) >= 2:
info(f"PortBinding has {len(rows)} entries:")
for r in rows:
print(f" {r.get('WANViewName', '?')} <-> {r.get('LANViewName', '?')}")
else:
warn(f"Expected 2 PortBinding entries, got {len(rows)}")
return True
# ── Step 9: Inject Persistent MAC Fix ─────────────────────
def step9_inject_mac_fix(self) -> bool:
"""Inject persistent boot script to fix the eth0/nbif0 MAC collision."""
step_header(9, "Inject Persistent MAC Fix Boot Script")
if not self.wan_mac:
error("WAN MAC not known — cannot create MAC fix script")
return False
if not self.base_mac:
self.base_mac = mac_decrement(self.wan_mac)
wan_mac_lower = self.wan_mac.lower()
base_mac_lower = self.base_mac.lower()
info(f"WAN MAC (nbif0): {wan_mac_lower}")
info(f"Fix MAC (eth0/br0): {base_mac_lower}")
fix_script = f"""#!/bin/sh
# fix-eth0-mac.sh — Fix MAC collision between eth0 and nbif0
# Generated by excitel-bridge-setup.py on {datetime.now().isoformat()}
#
# Problem: eth0 and nbif0 share MAC {wan_mac_lower}. The kernel bridge
# merges their FDB entries, causing frames to loop back on nbif0.
# Fix: Set eth0+br0 to {base_mac_lower} (WAN MAC - 1).
LOG="/var/tmp/fix-mac-boot.log"
echo "$(date) fix-eth0-mac.sh starting" >> "$LOG"
# Wait for nbif0 to join br0
MAX_WAIT=240
WAITED=0
while [ $WAITED -lt $MAX_WAIT ]; do
if brctl show br0 2>/dev/null | grep -q nbif0; then
echo "$(date) nbif0 joined br0 after ${{WAITED}}s" >> "$LOG"
break
fi
sleep 2
WAITED=$((WAITED + 2))
done
if [ $WAITED -ge $MAX_WAIT ]; then
echo "$(date) TIMEOUT waiting for nbif0 in br0" >> "$LOG"
exit 1
fi
ip link set dev eth0 address {base_mac_lower}
ip link set dev br0 address {base_mac_lower}
echo "$(date) Set eth0+br0 MAC to {base_mac_lower}" >> "$LOG"
bridge fdb del {wan_mac_lower} dev nbif0 master 2>/dev/null
echo "$(date) Deleted FDB entry for {wan_mac_lower} on nbif0" >> "$LOG"
echo 0 > /proc/sys/net/bridge/bridge-nf-call-iptables 2>/dev/null
echo "$(date) Disabled bridge-nf-call-iptables" >> "$LOG"
echo "$(date) fix-eth0-mac.sh completed" >> "$LOG"
"""
boot_trigger = """#!/bin/sh
# S98fixmac — Trigger the MAC fix script from /userconfig at boot
/userconfig/fix-eth0-mac.sh &
"""
if self.dry_run:
info("[DRY RUN] Would write /userconfig/fix-eth0-mac.sh:")
for line in fix_script.splitlines()[:5]:
print(f" {line}")
print(f" ... ({len(fix_script.splitlines())} lines total)")
info("[DRY RUN] Would mount mtdblock9 and write /etc/rcS.d/S98fixmac")
return True
tn = self._tn()
info("Writing /userconfig/fix-eth0-mac.sh...")
if not tn.write_file("/userconfig/fix-eth0-mac.sh", fix_script, 0o755):
error("Failed to write fix script")
return False
verify = tn.run("ls -la /userconfig/fix-eth0-mac.sh")
if "fix-eth0-mac.sh" in verify:
info("Fix script written successfully")
print(f" {verify.strip()}")
else:
error("Fix script verification failed")
return False
info("Mounting secondary rootfs (mtdblock9) to inject boot trigger...")
tn.run(
"mkdir -p /var/tmp/rootmnt",
"mount -t jffs2 /dev/mtdblock9 /var/tmp/rootmnt",
"mount -o remount,rw /var/tmp/rootmnt",
)
info("Writing S98fixmac boot trigger...")
if not tn.write_file("/var/tmp/rootmnt/etc/rcS.d/S98fixmac",
boot_trigger, 0o755):
error("Failed to write boot trigger")
tn.run("umount /var/tmp/rootmnt 2>/dev/null")
return False
verify = tn.run("ls -la /var/tmp/rootmnt/etc/rcS.d/S98fixmac")
if "S98fixmac" in verify:
info("Boot trigger written successfully")
print(f" {verify.strip()}")
else:
warn("Could not verify boot trigger — it may still work")
tn.run("sync", "mount -o remount,ro /var/tmp/rootmnt")
info("MAC fix boot script injected successfully")
return True
# ── Step 10: Reboot ───────────────────────────────────────
def step10_reboot(self) -> bool:
"""Reboot the ONT to apply all changes."""
step_header(10, "Reboot ONT")
if self.no_reboot:
info("Skipping reboot (--no-reboot flag)")
info(f" Reboot manually: telnet {self.ont_ip} → manu → reboot")
return True
if self.dry_run:
info("[DRY RUN] Would reboot the ONT and wait for it to come back")
return True
if not confirm("Reboot the ONT now? (it will be offline for ~90s)"):
info("Skipping reboot — remember to reboot manually!")
return True
tn = self._tn()
info("Sending reboot command...")
tn.run("reboot")
info("Waiting for ONT to go down...")
time.sleep(5)
info("Waiting for ONT to come back (up to 225s)...")
if wait_for_port(self.ont_ip, 80, timeout=225, interval=5):
info("ONT is back online!")
else:
warn("ONT did not respond within 225s — check manually")
return True
# ── Step 11: Show Router Config ───────────────────────────
def step11_show_config(self) -> bool:
"""Display downstream router configuration."""
step_header(11, "Downstream Router Configuration")
print(f"""
{B}Configure your downstream router with these settings:{N}
{B}PPPoE Connection{N}
{'─' * 40}""")
kv("PPPoE Username", self.pppoe_user or "(not extracted — check backup files)")
kv("PPPoE Password", self.pppoe_pass or "(not extracted — check backup files)")
kv("MAC to Clone", self.wan_mac or "(not extracted)")
kv("MTU", "1492")
kv("MRU", "1492")
kv("Service Name", "(leave blank / auto)")
kv("Auth Type", "PAP/CHAP/MS-CHAP (auto)")
print(f"""
{B}IPv6{N}
{'─' * 40}""")
kv("IPv6 Mode", "DHCPv6 (Prefix Delegation)")
kv("Prefix Hint", "/64")
kv("Note", "Excitel delegates a /64 via DHCPv6 PD")
print(f"""
{B}Important Notes{N}
{'─' * 40}
- You MUST clone the WAN MAC ({self.wan_mac}) on your router's WAN interface.
Without this, Excitel's OLT will not forward traffic to your router.
- MTU 1492 = 1500 - 8 bytes PPPoE overhead.
- After connecting PPPoE, verify with: ping -s 1464 -M do 8.8.8.8
(1464 + 20 IP + 8 ICMP = 1492 = MTU, should not fragment)
""")
if self.wan_mac:
print(f""" {B}UniFi Dream Machine Users{N}
{'─' * 40}
- MAC Clone: Settings → Internet → Primary (WAN1) → Advanced → MAC Clone
Set to: {self.wan_mac}
- To access ONT after bridge mode: add a secondary IP on the UDM LAN:
Settings → Networks → Default → Add static IP: 192.168.0.2/24
Then browse to http://{self.ont_ip}
""")
if not self.dry_run and not self.no_reboot and port_open(self.ont_ip, 23, timeout=3):
info("Checking MAC fix status on ONT...")
tn = self._tn()
log = tn.run("cat /var/tmp/fix-mac-boot.log 2>/dev/null")
if "completed" in log:
info("MAC fix boot script ran successfully!")
for line in log.strip().splitlines():
print(f" {line}")
elif log.strip():
warn("MAC fix log exists but may not have completed:")
for line in log.strip().splitlines():
print(f" {line}")
else:
warn("MAC fix log not found yet — check in a minute")
return True
# ── Revert Steps ─────────────────────────────────────────
def _decrypt_bin_backup(self, path: Path) -> str:
"""Decrypt an encrypted .bin config backup to XML string."""
if not _HAS_AES:
warn("pycryptodome not installed — cannot decrypt .bin backup")
return ""
try:
raw = path.read_bytes()
xml_bytes = decrypt_web_backup(raw)
xml_str = xml_bytes.decode("utf-8", errors="replace")
info(f"Decrypted {path.name}: {len(xml_str)} chars")
return xml_str
except Exception as e:
warn(f"Failed to decrypt {path.name}: {e}")
return ""
def _load_backup_values(self) -> dict[str, str]:
"""Load original ONT values from a backup file.
Searches only in the script's own backup directory (backup_dir),
which is where step 5 saves backups before making changes.
Order:
1. Explicit --backup-file path (XML or .bin)
2. Most recent ont_backup_*_full.xml in backup_dir
3. Excitel route-mode defaults
"""
xml_content = ""
# Try explicit backup file
if self.backup_file:
p = Path(self.backup_file)
if p.exists():
info(f"Loading backup from: {p}")
if p.suffix == ".bin":
xml_content = self._decrypt_bin_backup(p)
else:
xml_content = p.read_text(encoding="utf-8", errors="replace")
else:
warn(f"Backup file not found: {p}")
# Auto-detect from backup_dir only (where step 5 saves)
if not xml_content:
files = sorted(
_glob.glob(str(self.backup_dir / "ont_backup_*_full.xml")),
key=lambda f: Path(f).stat().st_mtime,
reverse=True,
)
if files:
info(f"Found backup: {files[0]}")
xml_content = Path(files[0]).read_text(
encoding="utf-8", errors="replace")
if not xml_content:
warn(f"No backup file found in {self.backup_dir}")
warn("Use --backup-file to specify one, or Excitel defaults will be used")
return dict(self.EXCITEL_ROUTE_DEFAULTS)
vals: dict[str, str] = {}
# Extract WANC values
wanc_rows = parse_config_xml_table(xml_content, "WANC")
if wanc_rows:
r = wanc_rows[0]
vals["ConnType"] = r.get("ConnType", "2")
vals["IsNAT"] = r.get("IsNAT", "1")
vals["WANCName"] = r.get("WANCName", "1_INTERNET_R_VID_")
vals["ServList"] = r.get("ServList", "3")
# Extract MgtServer values
mgt_rows = parse_config_xml_table(xml_content, "MgtServer")
if mgt_rows:
r = mgt_rows[0]
vals["Tr069Enable"] = r.get("Tr069Enable", "1")
vals["PeriodicInformEnable"] = r.get("PeriodicInformEnable", "1")
vals["STUNEnable"] = r.get("STUNEnable", "1")
# Fill missing values with defaults
for k, v in self.EXCITEL_ROUTE_DEFAULTS.items():
vals.setdefault(k, v)
return vals
def revert_step4_read_and_plan(self) -> bool:
"""Read current config and determine what needs reverting."""
step_header(4, "Read Current Config & Load Original Values")
if self.dry_run:
info("[DRY RUN] Would read current config and load backup values")
self._original_values = dict(self.EXCITEL_ROUTE_DEFAULTS)
return True
tn = self._tn()
# Read current WANC state
info("Reading current WANC...")
out = tn.run("sendcmd 1 DB p WANC")
rows = parse_sendcmd_xml(out)
if rows:
r = rows[0]
self.conn_type = r.get("ConnType", "")
self.wan_name = r.get("WANCName", "")
self.serv_list = r.get("ServList", "")
self.is_nat = r.get("IsNAT", "")
self.wan_mac = r.get("WorkIFMac", "")
ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"}
kv("ConnType (current)", f"{self.conn_type} ({ct_names.get(self.conn_type, '?')})")
kv("WANCName (current)", self.wan_name)
kv("ServList (current)", self.serv_list)
kv("IsNAT (current)", self.is_nat)
kv("WAN MAC", self.wan_mac)
if self.wan_mac:
self.base_mac = mac_decrement(self.wan_mac)
# Read current MgtServer state
info("Reading current MgtServer (TR-069)...")
out = tn.run("sendcmd 1 DB p MgtServer")
rows = parse_sendcmd_xml(out)
if rows:
self.tr069_enabled = rows[0].get("Tr069Enable", "")
self.periodic_inform = rows[0].get("PeriodicInformEnable", "")
self.stun_enabled = rows[0].get("STUNEnable", "")
kv("TR-069 (current)", self.tr069_enabled)
kv("Periodic Inform (current)", self.periodic_inform)
kv("STUN (current)", self.stun_enabled)
# Read current PortBinding state
info("Reading current PortBinding...")
out = tn.run("sendcmd 1 DB p PortBinding")
self.port_binding_rows = parse_sendcmd_xml(out)
kv("PortBinding entries", str(len(self.port_binding_rows)))
for r in self.port_binding_rows:
print(f" {r.get('WANViewName', '?')} <-> {r.get('LANViewName', '?')}")
# Check for MAC fix scripts
info("Checking for MAC fix scripts...")
out = tn.run("ls -la /userconfig/fix-eth0-mac.sh 2>/dev/null")
has_fix_script = "fix-eth0-mac.sh" in out
kv("fix-eth0-mac.sh", "present" if has_fix_script else "not found")
out = tn.run(
"mkdir -p /var/tmp/rootmnt",
"mount -t jffs2 /dev/mtdblock9 /var/tmp/rootmnt 2>/dev/null",
"ls -la /var/tmp/rootmnt/etc/rcS.d/S98fixmac 2>/dev/null",
)
has_boot_trigger = "S98fixmac" in out
kv("S98fixmac", "present" if has_boot_trigger else "not found")
tn.run("umount /var/tmp/rootmnt 2>/dev/null")
# Load original values
self._original_values = self._load_backup_values()
# Show what will change
print(f"\n{B} Planned revert actions:{N}")
changes: list[str] = []
orig_ct = self._original_values.get("ConnType", "2")
if self.conn_type != orig_ct:
ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"}
changes.append(
f"ConnType: {self.conn_type} ({ct_names.get(self.conn_type, '?')}) "
f"-> {orig_ct} ({ct_names.get(orig_ct, '?')})")
orig_nat = self._original_values.get("IsNAT", "1")
if self.is_nat != orig_nat:
changes.append(f"IsNAT: {self.is_nat} -> {orig_nat}")
orig_name = self._original_values.get("WANCName", "")
if orig_name and self.wan_name != orig_name:
changes.append(f"WANCName: {self.wan_name} -> {orig_name}")
orig_sl = self._original_values.get("ServList", "3")
if self.serv_list != orig_sl:
changes.append(f"ServList: {self.serv_list} -> {orig_sl}")
if self.tr069_enabled != self._original_values.get("Tr069Enable", "1"):
changes.append(f"Tr069Enable: {self.tr069_enabled} -> {self._original_values['Tr069Enable']}")
if self.periodic_inform != self._original_values.get("PeriodicInformEnable", "1"):
changes.append(f"PeriodicInformEnable: {self.periodic_inform} -> {self._original_values['PeriodicInformEnable']}")
if self.stun_enabled != self._original_values.get("STUNEnable", "1"):
changes.append(f"STUNEnable: {self.stun_enabled} -> {self._original_values['STUNEnable']}")
if self.port_binding_rows:
changes.append(f"PortBinding: remove {len(self.port_binding_rows)} entries")
if has_fix_script:
changes.append("Remove /userconfig/fix-eth0-mac.sh")
if has_boot_trigger:
changes.append("Remove /etc/rcS.d/S98fixmac from rootfs overlay")
if not changes:
info("Nothing to revert — ONT appears to be in factory state already")
return False
for ch in changes:
print(f" - {ch}")
print()
if not confirm("Proceed with revert?"):
info("Aborted by user")
return False
return True
def revert_step5_remove_mac_fix(self) -> bool:
"""Remove the MAC fix boot scripts from the ONT.
Removes:
- /userconfig/fix-eth0-mac.sh (jffs2 on mtdblock5, always writable)
- /etc/rcS.d/S98fixmac on the secondary rootfs overlay (mtdblock9)
"""
step_header(5, "Remove MAC Fix Boot Scripts")
if self.dry_run:
info("[DRY RUN] Would remove /userconfig/fix-eth0-mac.sh")
info("[DRY RUN] Would mount mtdblock9 and remove /etc/rcS.d/S98fixmac")
return True
tn = self._tn()
# Remove the fix script from /userconfig
info("Removing /userconfig/fix-eth0-mac.sh...")
out = tn.run("rm -f /userconfig/fix-eth0-mac.sh")
verify = tn.run("ls /userconfig/fix-eth0-mac.sh 2>&1")
if "No such file" in verify or "fix-eth0-mac.sh" not in verify:
info("fix-eth0-mac.sh removed")
else:
warn("fix-eth0-mac.sh may still exist — check manually")
# Remove boot trigger from rootfs overlay
info("Mounting secondary rootfs (mtdblock9)...")
tn.run(
"mkdir -p /var/tmp/rootmnt",
"mount -t jffs2 /dev/mtdblock9 /var/tmp/rootmnt 2>/dev/null",
"mount -o remount,rw /var/tmp/rootmnt",
)
info("Removing S98fixmac boot trigger...")
tn.run("rm -f /var/tmp/rootmnt/etc/rcS.d/S98fixmac")
verify = tn.run("ls /var/tmp/rootmnt/etc/rcS.d/S98fixmac 2>&1")
if "No such file" in verify or "S98fixmac" not in verify:
info("S98fixmac removed")
else:
warn("S98fixmac may still exist — check manually")
tn.run("sync", "mount -o remount,ro /var/tmp/rootmnt")
# Clean up any leftover MAC fix log
tn.run("rm -f /var/tmp/fix-mac-boot.log")
info("MAC fix boot scripts removed")
return True
def revert_step6_remove_port_binding(self) -> bool:
"""Remove PortBinding entries added by the setup script.
In factory route mode, the PortBinding table is empty (0 rows).
Bridge mode added entries binding ETH1+ETH2 to WCD1.WCPPP1.
"""
step_header(6, "Remove PortBinding Entries")
if not self.port_binding_rows:
info("No PortBinding entries to remove — already empty")
return True
if self.dry_run:
info(f"[DRY RUN] Would delete {len(self.port_binding_rows)} PortBinding entries")
return True
tn = self._tn()
count = len(self.port_binding_rows)
info(f"Deleting {count} PortBinding entries...")
# Delete in reverse order to keep indices stable
for i in range(count - 1, -1, -1):
tn.run(f"sendcmd 1 DB delr PortBinding {i}")
tn.run("sendcmd 1 DB save")
info("Verifying...")
out = tn.run("sendcmd 1 DB p PortBinding")
rows = parse_sendcmd_xml(out)
if len(rows) == 0:
info("PortBinding table is now empty")
else:
warn(f"PortBinding still has {len(rows)} entries — may need manual cleanup")
return True
def revert_step7_restore_route_mode(self) -> bool:
"""Restore WAN connection to route/PPPoE mode (ConnType=2).
Reverses step 7 of the setup. Restores:
- ConnType: 4 (bridge) -> 2 (route/PPPoE)
- IsNAT: 0 -> 1
- WANCName: 1_INTERNET_B_VID_ -> 1_INTERNET_R_VID_
- ServList: 1 -> 3 (INTERNET + TR069)
"""
step_header(7, "Restore Route Mode")
orig = self._original_values
ct = orig.get("ConnType", "2")
nat = orig.get("IsNAT", "1")
name = orig.get("WANCName", "1_INTERNET_R_VID_")
sl = orig.get("ServList", "3")
# Check if already in the target state
if (self.conn_type == ct and self.is_nat == nat
and self.wan_name == name and self.serv_list == sl):
info("WAN connection already in target state — skipping")
return True
if self.dry_run:
info(f"[DRY RUN] Would set WANC: ConnType={ct}, IsNAT={nat}, "
f"WANCName={name}, ServList={sl}")
return True
tn = self._tn()
info(f"Restoring route mode (ConnType={ct})...")
out = tn.run(
f"sendcmd 1 DB set WANC 0 ConnType {ct}",
f"sendcmd 1 DB set WANC 0 IsNAT {nat}",
f"sendcmd 1 DB set WANC 0 WANCName {name}",
f"sendcmd 1 DB set WANC 0 ServList {sl}",
"sendcmd 1 DB save",
)
for line in out.splitlines():
stripped = line.strip()
if stripped:
print(f" {stripped}")
info("Verifying...")
out = tn.run("sendcmd 1 DB p WANC")
rows = parse_sendcmd_xml(out)
if rows and rows[0].get("ConnType") == ct:
ct_names = {"1": "Route/DHCP", "2": "Route/PPPoE", "4": "Bridge"}
info(f"Route mode restored (ConnType={ct}, {ct_names.get(ct, '?')})")
return True
error(f"Failed to verify ConnType={ct}")
return False
def revert_step8_restore_tr069(self) -> bool:
"""Re-enable TR-069 management.
Reverses step 6 of the setup. The ISP uses TR-069 (CWMP) to manage
the ONT remotely. Re-enabling it restores normal ISP management.
"""
step_header(8, "Re-enable TR-069")
orig = self._original_values
tr069 = orig.get("Tr069Enable", "1")
periodic = orig.get("PeriodicInformEnable", "1")
stun = orig.get("STUNEnable", "1")
if (self.tr069_enabled == tr069 and self.periodic_inform == periodic
and self.stun_enabled == stun):
info("TR-069 settings already in target state — skipping")
return True
if self.dry_run:
info(f"[DRY RUN] Would set MgtServer: Tr069Enable={tr069}, "
f"PeriodicInformEnable={periodic}, STUNEnable={stun}")
return True
tn = self._tn()
info("Re-enabling TR-069...")
out = tn.run(
f"sendcmd 1 DB set MgtServer 0 Tr069Enable {tr069}",
f"sendcmd 1 DB set MgtServer 0 PeriodicInformEnable {periodic}",
f"sendcmd 1 DB set MgtServer 0 STUNEnable {stun}",
"sendcmd 1 DB save",
)
for line in out.splitlines():
stripped = line.strip()
if stripped:
print(f" {stripped}")
info("Verifying...")
out = tn.run("sendcmd 1 DB p MgtServer")
rows = parse_sendcmd_xml(out)
if rows:
ok = (rows[0].get("Tr069Enable") == tr069 and
rows[0].get("PeriodicInformEnable") == periodic and
rows[0].get("STUNEnable") == stun)
if ok:
info("TR-069 re-enabled successfully")
else:
warn("Some TR-069 fields may not have been set correctly")
return True
def revert_step9_reboot(self) -> bool:
"""Reboot the ONT to apply the reverted configuration."""
step_header(9, "Reboot ONT")
if self.no_reboot:
info("Skipping reboot (--no-reboot flag)")
info(f" Reboot manually: telnet {self.ont_ip} → manu → reboot")
return True
if self.dry_run:
info("[DRY RUN] Would reboot the ONT and wait for it to come back")
return True
if not confirm("Reboot the ONT now? (it will be offline for ~90s)"):
info("Skipping reboot — remember to reboot manually!")
return True
tn = self._tn()
info("Sending reboot command...")
tn.run("reboot")
info("Waiting for ONT to go down...")
time.sleep(5)
info("Waiting for ONT to come back (up to 225s)...")
if wait_for_port(self.ont_ip, 80, timeout=225, interval=5):
info("ONT is back online!")
else:
warn("ONT did not respond within 225s — check manually")
return True
def run_revert(self) -> bool:
"""Run the full revert process to undo bridge mode changes."""
header("Excitel ONT Bridge Mode — REVERT")
print(f" ONT IP: {self.ont_ip}")
print(f" Dry Run: {self.dry_run}")
print(f" No Reboot: {self.no_reboot}")
if self.backup_file:
print(f" Backup File: {self.backup_file}")
if not _HAS_AES:
warn("pycryptodome not installed — config backup decrypt will be unavailable")
if self.dry_run:
print(f"\n {Y}DRY RUN MODE — no changes will be made{N}")
print(f"\n {R}This will revert ALL bridge mode changes and restore the ONT")
print(f" to factory route mode. The ONT will handle PPPoE directly.{N}")
revert_steps: list[tuple[int, str, callable]] = [
(1, "Web UI Login", self.step1_web_login),
(2, "Enable Telnet", self.step2_enable_telnet),
(3, "Derive manu Credentials", self.step3_derive_manu),
(4, "Read Config & Plan Revert", self.revert_step4_read_and_plan),
(5, "Remove MAC Fix Scripts", self.revert_step5_remove_mac_fix),
(6, "Remove PortBinding", self.revert_step6_remove_port_binding),
(7, "Restore Route Mode", self.revert_step7_restore_route_mode),
(8, "Re-enable TR-069", self.revert_step8_restore_tr069),
(9, "Reboot", self.revert_step9_reboot),
]
for num, name, func in revert_steps:
if not func():
error(f"Revert step {num} ({name}) failed — aborting")
return False
header("Revert Complete!")
if self.dry_run:
info("This was a dry run — no changes were made")
info("Run without --dry-run to apply revert")
else:
info("Your ONT has been reverted to route mode")
info("The ONT will now handle PPPoE directly (no downstream router needed)")
info("If you had a downstream router doing PPPoE, disconnect it and")
info("connect your devices directly to the ONT's LAN ports")
return True
# ── Main Run ──────────────────────────────────────────────
def run(self) -> bool:
header("Excitel ONT Bridge Mode Auto-Setup")
print(f" ONT IP: {self.ont_ip}")
print(f" Dry Run: {self.dry_run}")
print(f" Skip To: Step {self.skip_to}")
print(f" No Reboot: {self.no_reboot}")
if not _HAS_AES:
warn("pycryptodome not installed — config backup decrypt will be unavailable")
warn("Install with: pip install pycryptodome")
if self.dry_run:
print(f"\n {Y}DRY RUN MODE — no changes will be made{N}")
steps: list[tuple[int, str, callable]] = [
(1, "Web UI Login", self.step1_web_login),
(2, "Enable Telnet", self.step2_enable_telnet),
(3, "Derive manu Credentials", self.step3_derive_manu),
(4, "Extract Config", self.step4_extract_config),
(5, "Backup Config", self.step5_backup),
(6, "Disable TR-069", self.step6_disable_tr069),
(7, "Set Bridge Mode", self.step7_set_bridge),
(8, "Configure PortBinding", self.step8_port_binding),
(9, "Inject MAC Fix Script", self.step9_inject_mac_fix),
(10, "Reboot", self.step10_reboot),
(11, "Show Router Config", self.step11_show_config),
]
for num, name, func in steps:
if not self.should_run(num):
info(f"Skipping step {num}: {name}")
continue
if not func():
error(f"Step {num} ({name}) failed — aborting")
return False
header("Setup Complete!")
if self.dry_run:
info("This was a dry run — no changes were made")
info("Run without --dry-run to apply changes")
else:
info("Your ONT is now in bridge mode")
info("Configure your downstream router with the settings shown above")
info("If anything goes wrong, restore from the backup files saved in Step 5")
return True
# ── CLI ───────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(
description="Excitel ONT Bridge Mode Auto-Setup for ZTE ZXIC 2K05X",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
%(prog)s Full automated setup
%(prog)s --dry-run Preview without making changes
%(prog)s --skip-to 6 Resume from step 6
%(prog)s --no-reboot Skip the reboot step
%(prog)s --ont-ip 10.0.0.1 Custom ONT IP address
%(prog)s --revert Revert all changes back to route mode
%(prog)s --revert --dry-run Preview what revert would do
%(prog)s --revert --backup-file ~/ont_backup_full.xml
""",
)
parser.add_argument("--ont-ip", default="192.168.0.1",
help="ONT IP address (default: 192.168.0.1)")
parser.add_argument("--dry-run", action="store_true",
help="Show what would happen without making changes")
parser.add_argument("--skip-to", type=int, default=1, metavar="N",
help="Resume from step N (1-11)")
parser.add_argument("--no-reboot", action="store_true",
help="Skip the reboot step (step 10)")
parser.add_argument("--revert", action="store_true",
help="Revert ALL changes — restore ONT to route mode")
parser.add_argument("--backup-file", metavar="FILE",
help="Decrypted XML backup file to restore original values from "
"(auto-detected if not specified)")
args = parser.parse_args()
if not args.revert and args.skip_to < 1 or args.skip_to > 11:
parser.error("--skip-to must be between 1 and 11")
if args.backup_file and not args.revert:
parser.error("--backup-file only makes sense with --revert")
setup = ExcitelBridgeSetup(
ont_ip=args.ont_ip,
dry_run=args.dry_run,
skip_to=args.skip_to,
no_reboot=args.no_reboot,
revert=args.revert,
backup_file=args.backup_file,
)
try:
if args.revert:
success = setup.run_revert()
else:
success = setup.run()
except KeyboardInterrupt:
print()
error("Interrupted by user")
success = False
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment