Skip to content

Instantly share code, notes, and snippets.

@AndhikaWB
Created September 12, 2025 14:23
Show Gist options
  • Select an option

  • Save AndhikaWB/7fb2a4450e05a731120dd936fb18d8db to your computer and use it in GitHub Desktop.

Select an option

Save AndhikaWB/7fb2a4450e05a731120dd936fb18d8db to your computer and use it in GitHub Desktop.
Prefect Lazy Materialize

Intro

Click to expand

Decorator to lazily materialize an asset based on the task input/output value. The usage is very similar as the original materialize decorator, here's an example:

@lazy_materialize('{{out_dir}}', asset_deps = ['{{data_dir}}/raw'], output_as = 'out_dir')
def preprocess_data(data_dir: str, preproc_folder: str):
    print(f'Using data from "{data_dir}/raw"')
    return f'{data_dir}/{preproc_folder}'

@flow(name = 'Dummy Test')
def main():
    data_dir = preprocess_data('data', 'processed')
    # Other task...

In the example above, the task output result is simply aliased as out_dir and saved as an asset. The data_dir asset dependency is also similar, but taken from the input argument instead.

While it can be used like above, my main motivation for creating this decorator is to simply monkey patch all my codes and make it run on Prefect without changing anything (not even writing a custom flow). An example of my actual flow (ETL for ML training):

from prefect import task, flow
from my_package.utils.prefect import lazy_materialize as lazym
# My original code somewhere on a different file
import my_package.preprocess as p


# Monkey patch all functions that will be called by the main function
# All {{names}} here are part of the task input argument, or the task output
p.pull_data = lazym('{{local_dir}}', asset_deps = ['{{remote_dir}}'], _f = p.pull_data)
p.preproc_data = lazym('{{out_dir}}', _f = p.preproc_data)
p.purge_remote_data = task(p.purge_remote_data) # Don't commit yet ;)
p.upload_data = task(p.upload_data)
p.commit_data = lazym(
    's3://{{repo_id}}/{{commit_id}}', output_as = 'commit_id', _f = p.commit_data
)

@flow(name = 'Data Preprocessing')
def main():
    # The main function that will call all the patched functions above
    # Due to Python quirks, it must be contained within another function
    # If this is called directly, the patches above won't take effect
    return p.main()

I think this is great for people who need to write orchestration code quickly, but want to keep their original code clean (untouched).

Obviously, there are some downsides by patching the code this way (e.g. can't rely on secret.load if you want the code to work without Prefect as well). However, I decided to share this because it feels pretty satisfying to use, especially for simple/personal projects.

Code

Tested on Python 3.12 and Prefect 3.4.14.

Don't use for production unless you know what you're doing, I consider this as prototype.

import re
import jinja2
import inspect
import functools
from pathlib import Path

from typing import Any, Callable, Iterable, Unpack, overload
from prefect.tasks import Task, MaterializingTask, TaskOptions

from prefect.assets import Asset
from prefect.context import AssetContext


class Template(Asset):
    # Prevent early key validation for template
    key: str


def validate_templates(
    known_vars: dict[str, Any], templates: Iterable[str | Template] | None,
    extra_vars: dict[str, Any] | None = None
) -> list[Asset]:
    """Validate asset templates with Jinja and ensure that each asset has a valid URI
    (will fallback to `file://` URI if there's no URI).

    Args:
        known_vars (dict[str, Any]): Known variables and their values (e.g. from the
            task input arguments). None values will be filtered.
        templates (Iterable[str | Template] | None): The asset templates that will be
            filled by the variable values. If the type is a `Template`, the "key" will
            be used instead.
        extra_vars (dict[str, Any], optional): Extra variables (e.g. for holding
            fallback values) that can be used in the templates. None values will be
            filtered. Defaults to None.

    Returns:
        list[Asset]: List of validated assets.
    """

    if not templates:
        return []

    if isinstance(templates, str):
        raise ValueError(
            # Don't loop string characters, generating incorrect assets
            f'Asset templates should be in a list or tuple, not string directly'
        )

    # Raise error instead of using empty string on undefined variable
    jinja_env = jinja2.Environment(undefined = jinja2.StrictUndefined)
    # None values are not undefined and still need to be filtered manually
    known_vars = {k: v for k, v in known_vars.items() if v is not None}
    if extra_vars:
        extra_vars = {k: v for k, v in extra_vars.items() if v is not None}

    # Use set to ensure no duplicate asset
    valid_assets: set[Asset] = set()

    for template in templates:
        try:
            asset = jinja_env.from_string(
                template.key if isinstance(template, Template) else template,
                globals = extra_vars
            )

            # Fill the asset template with the real asset value
            asset = asset.render(**known_vars)
        except jinja2.UndefinedError:
            asset = None

        if asset:
            # Check if the asset has a valid URI (e.g. s3://)
            # If not, assume as local file:// and give absolute path
            if not re.search(r'^[a-z0-9]+://', asset):
                asset = Path(asset).resolve().as_uri()

            valid_assets.add(
                Asset.model_validate(
                    dict(template) | {'key': asset} if isinstance(template, Template)
                    else {'key': asset}
                )
            )

    return list(valid_assets)


@overload
def lazy_materialize[**P, R](
    *assets: str | Template, by: str | None = ...,
    asset_deps: list[str | Template] | None = ..., output_as: str | None = ...,
    extra_vars: dict[str, Any] | None = ..., _f: Callable[P, R],
    **task_kwargs: Unpack[TaskOptions]
) -> Callable[P, R]: ...


@overload
def lazy_materialize[**P, R](
    *assets: str | Template, by: str | None = ...,
    asset_deps: list[str | Template] | None = ..., output_as: str | None = ...,
    extra_vars: dict[str, Any] | None = ..., _f: None = ...,
    **task_kwargs: Unpack[TaskOptions]
) -> Callable[[Callable[P, R]], Callable[P, R]]: ...


def lazy_materialize[**P, R](
    *assets: str | Template, by: str | None = None,
    asset_deps: list[str | Template] | None = None, output_as: str | None = None,
    extra_vars: dict[str, Any] | None = None, _f: Callable[P, R] | None = None,
    **task_kwargs: Unpack[TaskOptions]
) -> Callable[P, R] | Callable[[Callable[P, R]], Callable[P, R]]:
    """Lazy `@materialize` for Prefect, by taking the asset values from the original
    task input arguments, and/or task output result.

    For example, if your task is `upload_to_s3(local_dir, remote_dir)`, then you can
    simply use the `@lazy_materialize("{{remote_dir}}")` decorator to make `remote_dir`
    as your asset. If `remote_dir` is a nested attribute (e.g. in a Pydantic model), you
    can use `@lazy_materialize("{{remote_dir.target_attr}}")` instead.

    Other examples:
    - Multiple variables: `@lazy_materialize("s3://{{bucket}}/{{remote_dir}}")`
    - Multiple assets: `@lazy_materialize("s3://{{my_bucket}}", "s3://{{other_bucket}}")`
    - Asset dependency: `@lazy_materialize(asset_deps = ["{{local_dir}}"])`
    - Extra variable: `@lazy_materialize("{{fn(dir)}}", extra_vars = {"fn": func})`
    - Task output as asset: `@lazy_materialize("{{output}}", output_as = "output")`

    All variables must be part of the original task input arguments (assets with unknown
    variable will be skipped). If you need extra variables, use the `extra_vars` and/or
    the `output_as` parameter as shown above.

    Args:
        *assets (str | Template, optional): Templates containing variable names (e.g.
            from the task input arguments) that will be used as assets, with the format
            `"{{name}}"` instead of `"name"` directly. If the type is a `Template`, the
            "key" will be used instead.
        by (str, optional): The tool name that materialized the assets. Defaults to
            None.
        asset_deps (list[str | Template], optional): Similar as `assets` but used for
            declaring upstream assets (dependency) rather than downstream assets
            (material). Defaults to None.
        output_as (str, optional): Save the task output result as variable too (if the
            result is not None). You can use any name to refer to it, allowing you to
            use the name in the `assets` templates (but not in `asset_deps`). Defaults
            to None. The output assets will be added late, so some operations (e.g.
            adding metadata) may not work.
        extra_vars (dict[str, Any], optional): Extra variables that are needed in the
            templates, or for holding fallback value(s) in case the original argument(s)
            is missing (undefined). None values will be filtered. Defaults to None.
        **task_kwargs (Any, optional): Other arguments that can be passed to Prefect
            `Task` class.
    """

    def decorator(f: Callable[P, R]) -> Callable[P, R]:
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            # Convert all task input arguments as kwargs dict
            known_vars = inspect.getcallargs(f, *args, **kwargs)

            if output_as and output_as in known_vars.keys():
                raise KeyError(f'Variable already used as input: "{output_as}"')

            # Get the real asset and dependency values from the task input arguments
            assets_val = validate_templates(known_vars, assets, extra_vars)
            deps_val = validate_templates(known_vars, asset_deps, extra_vars) or None

            @functools.wraps(f)
            def f_inject(*args: P.args, **kwargs: P.kwargs) -> R:
                output = f(*args, **kwargs)
                
                if output_as and output is not None:
                    # Save the output result as known variable too
                    known_vars[output_as] = output

                    if context := AssetContext.get():
                        # Get current assets directly from the asset context
                        cur_assets = [i.key for i in context.downstream_assets]
                        # Get potential new assets from the task output
                        assets_val = validate_templates(known_vars, assets, extra_vars)

                        # Inject new assets right before ending the task
                        for asset in assets_val:
                            if asset.key not in cur_assets:
                                context.downstream_assets.add(asset)
                        context.update_tracked_assets()

                return output

            prefect_decorator = (
                MaterializingTask(
                    fn = f_inject if output_as else f, assets = assets_val,
                    materialized_by = by, asset_deps = deps_val, **task_kwargs
                ) if assets_val else Task(
                    fn = f_inject if output_as else f, asset_deps = deps_val,
                    **task_kwargs
                )
            )

            return prefect_decorator(*args, **kwargs)
        return wrapper
    return decorator if _f is None else decorator(_f)

Explanation

Click to expand

A Python decorator that accepts arguments is basically composed of this structure:

def decorator_with_args(dec_args):
    # Access or modify dec_args

    def decorator(f)
        # Access or modify dec_args, f

        def wrapper(f_args)
            # Access or modify dec_args, f, f_args

            return f(f_args)
        return wrapper
    return decorator

And the decorator is then used like this:

@decorator_with_args(dec_args)
def my_function(f_params):
    pass

my_function(f_args)

# Or like this if without @decorator_with_args
my_patched_function = decorator_with_args(dec_args)(my_function)
my_patched_function = my_patched_function(f_args)

Now let's see how Prefect materialize function works:

def materialize(dec_args):

    def decorator(f):

        # This function doesn't actually exist inside the decorator
        # This is the MaterializingTask.__call__ function
        def __call__(f_args): # wrapper function

            return f(f_args)
        # MaterializingTask.__init__ call
        return MaterializingTask(f, dec_args)
    return decorator

# ----------

@materialize('s3://whatever') # dec_args here
def my_function(remote_dir): # f_params here
    pass

my_function(remote_dir = 's3://whatever') # f_args here

If I want to modify it to behave like this:

@materialize('{{remote_dir}}') # dec_args here
def my_function(remote_dir): # f_params here
    pass

my_function(remote_dir = 's3://whatever') # f_args here

Then that means I have to access f_args, but the problem is f_args lies within the MaterializingTask.__call__ function, so I either have to:

  1. Modify/subclass the MaterializingTask class (at least the __init__ and __call__ function only)
  2. Initialize the MaterializingTask instance inside the wrapper function instead of decorator function (delayed initialization)

Obviously, I choose option 2 since option 1 may need to be maintained more oftenly.

def materialize(dec_args):

    def decorator(f):

        def wrapper(f_args):

            return MaterializingTask(f, dec_args)(f_args)
        return wrapper # Changed
    return decorator

But the downside of this option is, since the decorator now returns the wrapper function instead of the MaterializingTask class directly, we won't be able to use with_options or other properties/methods associated with MaterializingTask.

@materialize('{{oops_wrong_name}}') # dec_args here
def my_function(remote_dir): # f_params here
    pass

# No longer possible
my_function.with_options(['{{remote_dir}}'])(remote_dir = 's3://whatever')
# Still possible, of course
my_function(remote_dir = 's3://whatever')

I may be able to fix this by implementing option 1, but I currently have no plan to mantain this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment