Prefect's JSON deserializer executes arbitrary callables when it encounters a dictionary containing an __exc_type__ key. An attacker who controls an external data source that a Prefect task fetches can achieve remote code execution on any system that later deserializes the task result without needing Prefect access.
src/prefect/serializers.py:
def prefect_json_object_decoder(result: dict[str, Any]) -> Any:
if "__class__" in result:
# ...
elif "__exc_type__" in result:
return from_qualified_name(result["__exc_type__"])(result["message"])
else:
return resultfrom_qualified_name imports any Python module/attribute. There is no validation on the __exc_type__ value. The object_hook fires on every dict in the JSON tree, so the payload can be nested arbitrarily deep.
Prefect tasks run arbitrary code without a sandbox. So a malicious task author gaining RCE is not a meaningful finding. The issue is that the deserializer treats data as code, which creates privilege escalation across trust boundaries:
- An attacker poisons an external data source (API, database, message queue, user-submitted input).
- A legitimate task fetches and returns this data. Setting
result_serializer="json"(recommended by Prefect for Pydantic compatibility) auto-enables result persistence. - The poisoned result detonates when deserialized in a different security context: a developer's laptop via
state.result(), a more-privileged worker on cache hit, or a downstream task.
The attacker doesn't need Prefect access, as the framework converts data from an untrusted source into code execution on a separate, potentially more-privileged machine.
See attached file. Sample output:
Task run 'fetch_data-b0d' - Finished in state Completed()
Task run 'fetch_data-325' - Finished in state Cached(type=COMPLETED)
Task run 'fetch_data-119' - Finished in state Completed()
[1] Cache hit RCE: uid=1000(devbox) gid=1001(devbox) groups=1001(devbox),27(sudo),1000(docker)
[2] state.result() RCE: uid=1000(devbox) gid=1001(devbox) groups=1001(devbox),27(sudo),1000(docker)
Validate __exc_type__ against BaseException subclasses:
elif "__exc_type__" in result:
cls = from_qualified_name(result["__exc_type__"])
if not (isinstance(cls, type) and issubclass(cls, BaseException)):
raise ValueError(f"Invalid exception type: {result['__exc_type__']}")
return cls(result["message"])