Created
February 27, 2026 08:42
-
-
Save tknv/6deee5b68b8de0bd7e4c2df971002402 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # ecp.py | |
| # Minimal External Captive Portal for ExtremeCloud IQ Controller | |
| # Mode: | |
| # - RADIUS final authority (unsigned redirect) -> SIGNED_ECP_FINAL_AUTH = False | |
| # - ECP final authority (signed redirect, AWSv4) -> SIGNED_ECP_FINAL_AUTH = True | |
| # | |
| # Login: username=demo, password=P@ssw0rd! | |
| from flask import Flask, request, render_template_string, redirect, abort | |
| from urllib.parse import urlencode, quote | |
| from datetime import datetime, timedelta, timezone | |
| import hmac, hashlib | |
| app = Flask(__name__) | |
| # ---------------------------- | |
| # Configuration | |
| # ---------------------------- | |
| LISTEN_HOST = "0.0.0.0" | |
| LISTEN_PORT = 8080 | |
| # Hardcoded user store | |
| HARDCODED_USER = "demo" | |
| HARDCODED_PASS = "P@ssw0rd!" | |
| # Case switch: | |
| SIGNED_ECP_FINAL_AUTH = False # False=RADIUS最終(署名なし) / True=ECP最終(署名付き) | |
| # For signed mode (ECP final authority): | |
| # These MUST match XIQ-Controller's ECP config (Identity / Shared Secret). | |
| ECP_IDENTITY = "BigAuthInc" # example identity | |
| ECP_SECRET = "SuperSecretSharedKey" # example shared secret | |
| AWS_REGION = "world" # fixed per spec | |
| AWS_SERVICE = "ecp" # fixed per spec | |
| SIGNED_HEADERS = "host" # fixed per spec | |
| AMZ_ALGO = "AWS4-HMAC-SHA256" | |
| AMZ_EXPIRES = 60 # seconds to trust the signed redirect URL on controller side | |
| # Default role & session timeout (when ECP is final authority) | |
| ASSIGNED_ROLE = "Guest_Access" | |
| SESSION_TIMEOUT_SECONDS = 36000 # opt27 equivalent (10 hours) | |
| # Very small login page template | |
| LOGIN_HTML = """ | |
| <!doctype html> | |
| <title>Simple ECP Login</title> | |
| <h2>External Captive Portal (Demo)</h2> | |
| {% if err %}<p style="color:red">{{err}}</p>{% endif %} | |
| <form method="post"> | |
| <label>Username: <input name="username" value="{{default_user}}" /></label><br/> | |
| <label>Password: <input name="password" type="password" value="{{default_pass}}" /></label><br/> | |
| <!-- Hidden context from controller --> | |
| <input type="hidden" name="token" value="{{token}}"> | |
| <input type="hidden" name="wlan" value="{{wlan}}"> | |
| <input type="hidden" name="dest" value="{{dest}}"> | |
| <input type="hidden" name="hwc_ip" value="{{hwc_ip}}"> | |
| <input type="hidden" name="hwc_port" value="{{hwc_port}}"> | |
| <button type="submit">Login</button> | |
| </form> | |
| <pre style="background:#f6f8fa;padding:8px"> | |
| Received context: | |
| token={{token}} | |
| wlan={{wlan}} | |
| hwc_ip={{hwc_ip}} | |
| hwc_port={{hwc_port}} | |
| dest={{dest}} | |
| </pre> | |
| """ | |
| def require_params(params: dict, keys): | |
| for k in keys: | |
| if not params.get(k): | |
| abort(400, f"Missing required parameter: {k}") | |
| # ---------------------------- | |
| # AWSv4 signing helpers (query presign style) | |
| # ---------------------------- | |
| def sha256_hex(s: str) -> str: | |
| return hashlib.sha256(s.encode("utf-8")).hexdigest() | |
| def hmac_sha256(key: bytes, msg: str) -> bytes: | |
| return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() | |
| def aws4_signing_key(secret: str, date_yyyymmdd: str, region: str, service: str) -> bytes: | |
| # per AWSv4: kDate = HMAC("AWS4"+secret, date) | |
| k_date = hmac_sha256(("AWS4" + secret).encode("utf-8"), date_yyyymmdd) | |
| k_region = hmac.new(k_date, region.encode("utf-8"), hashlib.sha256).digest() | |
| k_service = hmac.new(k_region, service.encode("utf-8"), hashlib.sha256).digest() | |
| k_signing = hmac.new(k_service, b"aws4_request", hashlib.sha256).digest() | |
| return k_signing | |
| def build_signed_redirect(base_url: str, host_for_header: str, user_params: dict) -> str: | |
| """ | |
| Build a signed URL: | |
| base_url: e.g. https://10.10.21.6/ext_approval.php | |
| host_for_header: host part for the canonical request (no port if 443/80) | |
| user_params: dict including token,wlan,username,(optional dest,role,opt27) | |
| """ | |
| # Timestamp in UTC (YYYYMMDDThhmmssZ) and DATE | |
| now = datetime.now(timezone.utc) | |
| amz_date = now.strftime("%Y%m%dT%H%M%SZ") | |
| short_date = now.strftime("%Y%m%d") | |
| # Scope & credential | |
| credential_scope = f"{short_date}/{AWS_REGION}/{AWS_SERVICE}/aws4_request" | |
| amz_credential = f"{ECP_IDENTITY}/{credential_scope}" | |
| # Start query parameters (excluding X-Amz-* initially) | |
| query = dict(user_params) # copy user-provided | |
| # Add AWS mandatory params | |
| query["X-Amz-Algorithm"] = AMZ_ALGO | |
| query["X-Amz-Credential"] = amz_credential | |
| query["X-Amz-Date"] = amz_date | |
| query["X-Amz-Expires"] = str(AMZ_EXPIRES) | |
| query["X-Amz-SignedHeaders"] = SIGNED_HEADERS | |
| # Canonical query string: keys sorted by byte order; values URL-encoded | |
| canonical_q = "&".join( | |
| f"{quote(k, safe='-_.~')}={quote(str(query[k]), safe='-_.~')}" | |
| for k in sorted(query.keys()) | |
| ) | |
| # Canonical request | |
| # GET \n <path>\n <canonical_query>\n host:<host>\n \n host \n UNSIGNED-PAYLOAD | |
| # Extract path from base_url | |
| # Easiest: split after scheme and host | |
| # (We assume base_url provided as full URL with scheme and path only) | |
| from urllib.parse import urlparse | |
| p = urlparse(base_url) | |
| canonical_uri = p.path if p.path else "/" | |
| canonical_headers = f"host:{host_for_header}\n" | |
| signed_headers = SIGNED_HEADERS | |
| canonical_request = "GET\n" + canonical_uri + "\n" + canonical_q + "\n" + canonical_headers + "\n" + signed_headers + "\n" + "UNSIGNED-PAYLOAD" | |
| # String to sign | |
| sts = AMZ_ALGO + "\n" + amz_date + "\n" + credential_scope + "\n" + sha256_hex(canonical_request) | |
| # Signing key and signature | |
| sk = aws4_signing_key(ECP_SECRET, short_date, AWS_REGION, AWS_SERVICE) | |
| signature = hmac.new(sk, sts.encode("utf-8"), hashlib.sha256).hexdigest() | |
| # Final URL = base_url + '?' + canonical_q + "&X-Amz-Signature=" + signature | |
| final_q = canonical_q + "&X-Amz-Signature=" + signature | |
| return f"{base_url}?{final_q}" | |
| # ---------------------------- | |
| # Routes | |
| # ---------------------------- | |
| @app.route("/login", methods=["GET", "POST"]) | |
| def login(): | |
| if request.method == "GET": | |
| # Controller -> ECP redirect arrives with many query params. | |
| # We keep the essentials for the return trip. | |
| q = request.args | |
| # Required: token, wlan; Optional: dest, hwc_ip/hwc_port (Controller address & port) | |
| require_params(q, ["token", "wlan"]) | |
| token = q.get("token") | |
| wlan = q.get("wlan") | |
| dest = q.get("dest", "") | |
| hwc_ip = q.get("hwc_ip", "") # Controller IP for return | |
| hwc_port= q.get("hwc_port", "") # Controller port (80 or 443が多い) | |
| return render_template_string( | |
| LOGIN_HTML, | |
| err=None, | |
| default_user=HARDCODED_USER, | |
| default_pass=HARDCODED_PASS, | |
| token=token, wlan=wlan, dest=dest, hwc_ip=hwc_ip, hwc_port=hwc_port | |
| ) | |
| # POST (user submits credentials) | |
| form = request.form | |
| require_params(form, ["username", "password", "token", "wlan"]) | |
| username = form["username"] | |
| password = form["password"] | |
| token = form["token"] | |
| wlan = form["wlan"] | |
| dest = form.get("dest", "") | |
| hwc_ip = form.get("hwc_ip", "") | |
| hwc_port = form.get("hwc_port", "") | |
| # Simple check against hardcoded account | |
| if not (username == HARDCODED_USER and password == HARDCODED_PASS): | |
| return render_template_string( | |
| LOGIN_HTML, | |
| err="Invalid username or password", | |
| default_user=username, default_pass="", | |
| token=token, wlan=wlan, dest=dest, hwc_ip=hwc_ip, hwc_port=hwc_port | |
| ), 401 | |
| # Build return URL for the Controller's ext_approval.php | |
| if not hwc_ip: | |
| # As a fallback, allow manual env if controller IP not provided | |
| return abort(400, "hwc_ip (controller IP) was not provided by the Controller redirect") | |
| scheme = "https" if hwc_port == "443" else "http" | |
| base = f"{scheme}://{hwc_ip}" | |
| if hwc_port and hwc_port not in ("80", "443"): | |
| base += f":{hwc_port}" | |
| base += "/ext_approval.php" | |
| if SIGNED_ECP_FINAL_AUTH: | |
| # ECP is final authority: signed URL with role/opt27 (session timeout) | |
| user_params = { | |
| "token": token, | |
| "wlan": wlan, | |
| "username": username, | |
| } | |
| # optional fields | |
| if dest: | |
| user_params["dest"] = dest | |
| if ASSIGNED_ROLE: | |
| user_params["role"] = ASSIGNED_ROLE | |
| if SESSION_TIMEOUT_SECONDS: | |
| user_params["opt27"] = str(SESSION_TIMEOUT_SECONDS) | |
| # Host header uses host only (no port if 80/443) | |
| host_for_header = hwc_ip | |
| # sign and redirect | |
| signed_url = build_signed_redirect(base, host_for_header, user_params) | |
| return redirect(signed_url, code=302) | |
| else: | |
| # RADIUS final authority: unsigned redirect with username/password | |
| # Controller will use configured RADIUS servers to authenticate | |
| ret_params = { | |
| "token": token, | |
| "wlan": wlan, | |
| "username": username, | |
| "password": password, | |
| } | |
| if dest: | |
| ret_params["dest"] = dest # if controller is set to go to original destination | |
| return redirect(f"{base}?{urlencode(ret_params)}", code=302) | |
| @app.route("/") | |
| def index(): | |
| return "ECP is running. Use /login via Controller redirect." | |
| if __name__ == "__main__": | |
| app.run(host=LISTEN_HOST, port=LISTEN_PORT) | |
| `` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment