Skip to content

Instantly share code, notes, and snippets.

@jmchilton
Created November 20, 2025 15:44
Show Gist options
  • Select an option

  • Save jmchilton/9598d3a622fc4ee1ba14831d42939c06 to your computer and use it in GitHub Desktop.

Select an option

Save jmchilton/9598d3a622fc4ee1ba14831d42939c06 to your computer and use it in GitHub Desktop.
WES Task Logs Pagination - Database-Level Implementation Plan

WES Task Logs Pagination - Database-Level Implementation Plan

Problem Statement

Current implementation in wes.py:

  • _build_task_logs() (lines 886-913): Loads ALL WorkflowInvocationSteps into memory, then all related Jobs
  • Used in 3 places:
    1. get_run_tasks() (line 701) - loads all, slices in Python
    2. get_run_task() (line 742) - loads all to find one
    3. _invocation_to_run_log() (line 868) - populates deprecated field

This is inefficient for large workflows with hundreds of steps or collection jobs with thousands of jobs.

Key Finding: task_logs Field is Deprecated

From lib/galaxy/schema/wes/__init__.py lines 244-246:

task_logs: Optional[List[Union[Log, TaskLog]]] = Field(
    None,
    description="... This field is deprecated and the `task_logs_url` should be used
    to retrieve a paginated list of steps from the workflow run. This field will be
    removed in the next major version of the specification (2.0.0)",
)

Conclusion: We can and should set task_logs=None in RunLog responses and rely entirely on task_logs_url for pagination. This is the spec-recommended approach.

Goal

Remove _build_task_logs() entirely and implement database-level pagination:

  1. get_run_tasks() - Paginated query with UNION for all task types
  2. get_run_task() - Direct lookup by parsing task_id (no full scan)
  3. _invocation_to_run_log() - Set task_logs=None (already has task_logs_url)

Benefits:

  • Only fetch needed objects (O(page_size) instead of O(total_tasks))
  • Minimize memory usage
  • Minimize database queries (batch loading)
  • Spec-compliant (follow deprecation guidance)

Model Relationships

From lib/galaxy/model/__init__.py:

class WorkflowInvocationStep:
    id: int (primary key)
    workflow_invocation_id: int (FK)
    job_id: Optional[int] (FK to Job) - for single jobs
    implicit_collection_jobs_id: Optional[int] (FK) - for collection jobs
    order_index: int (column_property from WorkflowStep)
    workflow_step: WorkflowStep (relationship)

class Job:
    id: int (primary key)
    create_time, update_time, command_line, exit_code: various fields

class ImplicitCollectionJobs:
    id: int (primary key)
    jobs: List[ImplicitCollectionJobsJobAssociation] (relationship)

class ImplicitCollectionJobsJobAssociation:
    id: int (primary key)
    implicit_collection_jobs_id: int (FK)
    job_id: int (FK to Job)
    order_index: int - ordering within the collection
    job: Job (relationship)

Task ID Format

Each WorkflowInvocationStep generates 1 or more TaskLog entries with specific ID format:

  1. Single job step (job_id not NULL):

    • Task ID: "{step_id}" (e.g., "123")
  2. Collection jobs step (implicit_collection_jobs_id not NULL):

    • Task IDs: "{step_id}.{job_index}" (e.g., "123.0", "123.1", "123.2")
    • Ordered by ImplicitCollectionJobsJobAssociation.order_index
  3. No job step (both NULL):

    • Task ID: "{step_id}" (e.g., "456")

SQL Strategy for get_run_tasks()

Use UNION ALL to create virtual table of all task rows, then paginate:

-- Part 1: Steps with single jobs
SELECT
    wis.id as step_id,
    wis.order_index as step_order,
    'single' as task_type,
    wis.job_id as job_id,
    0 as job_index
FROM workflow_invocation_step wis
WHERE wis.workflow_invocation_id = :invocation_id
  AND wis.job_id IS NOT NULL

UNION ALL

-- Part 2: Steps with collection jobs (expanded per job)
SELECT
    wis.id as step_id,
    wis.order_index as step_order,
    'collection' as task_type,
    icjja.job_id as job_id,
    icjja.order_index as job_index
FROM workflow_invocation_step wis
JOIN implicit_collection_jobs icj
    ON wis.implicit_collection_jobs_id = icj.id
JOIN implicit_collection_jobs_job_association icjja
    ON icj.id = icjja.implicit_collection_jobs_id
WHERE wis.workflow_invocation_id = :invocation_id
  AND wis.implicit_collection_jobs_id IS NOT NULL

UNION ALL

-- Part 3: Steps with no jobs
SELECT
    wis.id as step_id,
    wis.order_index as step_order,
    'no_job' as task_type,
    NULL as job_id,
    0 as job_index
FROM workflow_invocation_step wis
WHERE wis.workflow_invocation_id = :invocation_id
  AND wis.job_id IS NULL
  AND wis.implicit_collection_jobs_id IS NULL

ORDER BY step_order, job_index
LIMIT :limit OFFSET :offset

Implementation Details

1. Helper: Build UNION Query

Method: _build_task_rows_query(invocation_id: int) -> Select

Returns SQLAlchemy Select that produces columns:

  • step_id: int
  • step_order: int
  • task_type: Literal['single', 'collection', 'no_job']
  • job_id: Optional[int]
  • job_index: int
def _build_task_rows_query(self, invocation_id: int) -> Select:
    """Build UNION query for all task rows in an invocation."""

    # Subquery 1: Single job steps
    single_jobs = (
        select(
            WorkflowInvocationStep.id.label('step_id'),
            WorkflowInvocationStep.order_index.label('step_order'),
            literal('single').label('task_type'),
            WorkflowInvocationStep.job_id.label('job_id'),
            literal(0).label('job_index'),
        )
        .where(
            WorkflowInvocationStep.workflow_invocation_id == invocation_id,
            WorkflowInvocationStep.job_id.isnot(None),
        )
    )

    # Subquery 2: Collection job steps (expanded)
    collection_jobs = (
        select(
            WorkflowInvocationStep.id.label('step_id'),
            WorkflowInvocationStep.order_index.label('step_order'),
            literal('collection').label('task_type'),
            ImplicitCollectionJobsJobAssociation.job_id.label('job_id'),
            ImplicitCollectionJobsJobAssociation.order_index.label('job_index'),
        )
        .join(
            ImplicitCollectionJobs,
            WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id
        )
        .join(
            ImplicitCollectionJobsJobAssociation,
            ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
        )
        .where(
            WorkflowInvocationStep.workflow_invocation_id == invocation_id,
            WorkflowInvocationStep.implicit_collection_jobs_id.isnot(None),
        )
    )

    # Subquery 3: No-job steps
    no_jobs = (
        select(
            WorkflowInvocationStep.id.label('step_id'),
            WorkflowInvocationStep.order_index.label('step_order'),
            literal('no_job').label('task_type'),
            literal(None).label('job_id'),
            literal(0).label('job_index'),
        )
        .where(
            WorkflowInvocationStep.workflow_invocation_id == invocation_id,
            WorkflowInvocationStep.job_id.is_(None),
            WorkflowInvocationStep.implicit_collection_jobs_id.is_(None),
        )
    )

    return union_all(single_jobs, collection_jobs, no_jobs)

Note on order_index: WorkflowInvocationStep.order_index is a column_property with scalar subquery. May need to join WorkflowStep explicitly if this causes issues. Test first.

2. Helper: Fetch Paginated Task Rows

Method: _get_paginated_task_rows(trans, invocation_id, offset, limit) -> List[dict]

Executes the UNION query with ordering and pagination.

def _get_paginated_task_rows(
    self,
    trans: ProvidesUserContext,
    invocation_id: int,
    offset: int,
    limit: int,
) -> List[dict]:
    """Fetch paginated task rows from database.

    Returns list of dicts with keys: step_id, step_order, task_type, job_id, job_index
    """
    # Build UNION subquery
    task_rows_subquery = self._build_task_rows_query(invocation_id).subquery()

    # Apply ordering and pagination
    stmt = (
        select(
            task_rows_subquery.c.step_id,
            task_rows_subquery.c.step_order,
            task_rows_subquery.c.task_type,
            task_rows_subquery.c.job_id,
            task_rows_subquery.c.job_index,
        )
        .order_by(
            task_rows_subquery.c.step_order,
            task_rows_subquery.c.job_index,
        )
        .offset(offset)
        .limit(limit + 1)  # Fetch one extra to detect more results
    )

    result = trans.sa_session.execute(stmt)
    return [dict(row._mapping) for row in result]

3. Helper: Batch Load Objects

Method: _load_task_objects(trans, task_rows) -> tuple[dict, dict]

Given task rows, batch load all needed WorkflowInvocationSteps and Jobs.

def _load_task_objects(
    self,
    trans: ProvidesUserContext,
    task_rows: List[dict],
) -> tuple[dict[int, WorkflowInvocationStep], dict[int, Job]]:
    """Batch load Step and Job objects for task rows.

    Returns:
    - steps_by_id: {step_id: WorkflowInvocationStep}
    - jobs_by_id: {job_id: Job}
    """
    if not task_rows:
        return {}, {}

    # Extract unique IDs
    step_ids = list(set(row['step_id'] for row in task_rows))
    job_ids = list(set(row['job_id'] for row in task_rows if row['job_id'] is not None))

    # Batch load steps with workflow_step relationship
    steps = (
        trans.sa_session.query(WorkflowInvocationStep)
        .options(joinedload(WorkflowInvocationStep.workflow_step))
        .filter(WorkflowInvocationStep.id.in_(step_ids))
        .all()
    )
    steps_by_id = {step.id: step for step in steps}

    # Batch load jobs
    jobs_by_id = {}
    if job_ids:
        jobs = (
            trans.sa_session.query(Job)
            .filter(Job.id.in_(job_ids))
            .all()
        )
        jobs_by_id = {job.id: job for job in jobs}

    return steps_by_id, jobs_by_id

4. Helper: Convert Task Row to TaskLog

Method: _task_row_to_task_log(task_row, steps_by_id, jobs_by_id) -> TaskLog

def _task_row_to_task_log(
    self,
    task_row: dict,
    steps_by_id: dict[int, WorkflowInvocationStep],
    jobs_by_id: dict[int, Job],
) -> TaskLog:
    """Convert a task row dict to a TaskLog object."""
    step_id = task_row['step_id']
    job_id = task_row['job_id']
    job_index = task_row['job_index']
    task_type = task_row['task_type']

    step = steps_by_id[step_id]
    workflow_step = step.workflow_step

    # Generate task ID
    if task_type == 'collection':
        task_id = f"{step_id}.{job_index}"
    else:
        task_id = str(step_id)

    # Get step name
    step_name = workflow_step.label or workflow_step.tool_id or f"step_{step.order_index}"

    # Build TaskLog with or without job details
    if job_id is not None:
        job = jobs_by_id[job_id]
        return TaskLog(
            id=task_id,
            name=step_name,
            start_time=job.create_time.isoformat() if job.create_time else None,
            end_time=job.update_time.isoformat() if job.update_time else None,
            stdout=f"/api/jobs/{self._security.encode_id(job.id)}/stdout",
            stderr=f"/api/jobs/{self._security.encode_id(job.id)}/stderr",
            exit_code=job.exit_code,
        )
    else:
        # No job - use step-level timing
        return TaskLog(
            id=task_id,
            name=step_name,
            start_time=step.create_time.isoformat() if step.create_time else None,
            end_time=step.update_time.isoformat() if step.update_time else None,
        )

5. Refactor: get_run_tasks() with Pagination

Replace lines 680-718 with efficient paginated implementation:

def get_run_tasks(
    self,
    trans: ProvidesUserContext,
    run_id: int,
    page_size: int = 10,
    page_token: Optional[str] = None,
) -> TaskListResponse:
    """Get paginated list of tasks for a workflow run.

    Uses database-level pagination via UNION query to avoid loading
    all steps/jobs into memory.
    """
    invocation = self._get_invocation(trans, run_id)

    # Decode page token to offset
    offset = _decode_page_token(page_token)

    # Fetch paginated task rows (+1 to detect more results)
    task_rows = self._get_paginated_task_rows(
        trans,
        invocation.id,
        offset,
        page_size,
    )

    # Check if more results exist
    has_more = len(task_rows) > page_size
    if has_more:
        task_rows = task_rows[:page_size]

    # Batch load all needed objects
    steps_by_id, jobs_by_id = self._load_task_objects(trans, task_rows)

    # Convert to TaskLog objects
    task_logs = [
        self._task_row_to_task_log(row, steps_by_id, jobs_by_id)
        for row in task_rows
    ]

    # Generate next page token
    next_page_token = None
    if has_more:
        next_page_token = _encode_page_token(offset + page_size)

    return TaskListResponse(
        task_logs=task_logs if task_logs else None,
        next_page_token=next_page_token,
    )

6. Refactor: get_run_task() with Direct Lookup

Replace lines 720-750 with efficient direct lookup:

def get_run_task(
    self,
    trans: ProvidesUserContext,
    run_id: int,
    task_id: str,
) -> TaskLog:
    """Get details for a specific task by direct lookup.

    Parses task_id to extract step_id and optional job_index,
    then directly queries for that specific step/job.
    """
    invocation = self._get_invocation(trans, run_id)

    # Parse task_id: either "{step_id}" or "{step_id}.{job_index}"
    parts = task_id.split('.')
    try:
        step_id = int(parts[0])
        job_index = int(parts[1]) if len(parts) > 1 else None
    except (ValueError, IndexError):
        raise exceptions.ObjectNotFound(f"Invalid task_id format: {task_id}")

    # Fetch the specific step
    step = (
        trans.sa_session.query(WorkflowInvocationStep)
        .options(joinedload(WorkflowInvocationStep.workflow_step))
        .filter(
            WorkflowInvocationStep.id == step_id,
            WorkflowInvocationStep.workflow_invocation_id == invocation.id,
        )
        .one_or_none()
    )

    if not step:
        raise exceptions.ObjectNotFound(f"Task {task_id} not found in run {run_id}")

    # Get step name
    workflow_step = step.workflow_step
    step_name = workflow_step.label or workflow_step.tool_id or f"step_{step.order_index}"

    # Handle different step types
    if step.job_id:
        # Single job step
        if job_index is not None:
            raise exceptions.ObjectNotFound(
                f"Task {task_id} specifies job_index but step has single job"
            )

        job = trans.sa_session.query(Job).filter(Job.id == step.job_id).one()

        return TaskLog(
            id=str(step_id),
            name=step_name,
            start_time=job.create_time.isoformat() if job.create_time else None,
            end_time=job.update_time.isoformat() if job.update_time else None,
            stdout=f"/api/jobs/{self._security.encode_id(job.id)}/stdout",
            stderr=f"/api/jobs/{self._security.encode_id(job.id)}/stderr",
            exit_code=job.exit_code,
        )

    elif step.implicit_collection_jobs_id:
        # Collection jobs step
        if job_index is None:
            raise exceptions.ObjectNotFound(
                f"Task {task_id} missing job_index for collection job step"
            )

        # Fetch specific job from collection by order_index
        job_assoc = (
            trans.sa_session.query(ImplicitCollectionJobsJobAssociation)
            .filter(
                ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id == step.implicit_collection_jobs_id,
                ImplicitCollectionJobsJobAssociation.order_index == job_index,
            )
            .one_or_none()
        )

        if not job_assoc:
            raise exceptions.ObjectNotFound(f"Task {task_id} job not found at index {job_index}")

        job = trans.sa_session.query(Job).filter(Job.id == job_assoc.job_id).one()

        return TaskLog(
            id=task_id,
            name=step_name,
            start_time=job.create_time.isoformat() if job.create_time else None,
            end_time=job.update_time.isoformat() if job.update_time else None,
            stdout=f"/api/jobs/{self._security.encode_id(job.id)}/stdout",
            stderr=f"/api/jobs/{self._security.encode_id(job.id)}/stderr",
            exit_code=job.exit_code,
        )

    else:
        # No job step
        if job_index is not None:
            raise exceptions.ObjectNotFound(
                f"Task {task_id} specifies job_index but step has no jobs"
            )

        return TaskLog(
            id=str(step_id),
            name=step_name,
            start_time=step.create_time.isoformat() if step.create_time else None,
            end_time=step.update_time.isoformat() if step.update_time else None,
        )

7. Refactor: _invocation_to_run_log() Remove Deprecated Field

Change line 868 from:

task_logs = self._build_task_logs(trans, invocation)

To:

# task_logs field is deprecated per WES spec - use task_logs_url instead
task_logs = None

And on line 881, keep:

task_logs=task_logs if task_logs else None,  # Will always be None now

Or simplify to:

task_logs=None,  # Deprecated field - use task_logs_url

8. Delete: _build_task_logs() and _step_to_task_log()

Remove methods entirely (lines 886-976):

  • _build_task_logs()
  • _step_to_task_log()

These are replaced by the new paginated and direct-lookup approaches.

Required Imports

Add to imports section:

from sqlalchemy import (
    # existing imports...
    union_all,
    literal,
)
from sqlalchemy.orm import joinedload

Testing Strategy

All existing tests should continue to pass since API contracts unchanged:

  1. GET /runs/{run_id}: Returns RunLog with task_logs=None and task_logs_url populated
  2. GET /runs/{run_id}/tasks: Returns paginated TaskListResponse
  3. GET /runs/{run_id}/tasks/{task_id}: Returns single TaskLog

Additional test cases to add:

def test_wes_task_pagination_large_workflow():
    """Test pagination with 100+ step workflow."""
    # Submit large workflow, verify pagination works correctly

def test_wes_task_direct_lookup_single_job():
    """Test get_run_task() for step with single job."""
    # Verify task_id="{step_id}" returns correct TaskLog

def test_wes_task_direct_lookup_collection_jobs():
    """Test get_run_task() for collection job step."""
    # Verify task_id="{step_id}.{job_index}" returns correct TaskLog

def test_wes_task_direct_lookup_no_job():
    """Test get_run_task() for step without job."""
    # Verify minimal TaskLog returned

def test_wes_task_direct_lookup_invalid_format():
    """Test get_run_task() with malformed task_id."""
    # Verify 404 error for invalid task_id

def test_wes_run_log_no_inline_tasks():
    """Test GET /runs/{run_id} returns task_logs=None."""
    # Verify deprecated field is None, task_logs_url present

Performance Expectations

Before (current implementation):

  • Query 1: Load all WorkflowInvocationSteps for invocation
  • Query 2-N: Load each step's job (N+1 query problem)
  • Query N+1-M: Load each collection's jobs
  • Memory: O(total_tasks) - all tasks in memory

After (with pagination):

  • Query 1: UNION query with LIMIT/OFFSET (returns page_size task rows)
  • Query 2: Batch load steps (single query with IN clause)
  • Query 3: Batch load jobs (single query with IN clause)
  • Memory: O(page_size) - only current page in memory

For workflow with 1000 steps, fetching page of 10:

  • Before: ~1001+ queries, load 1000+ objects
  • After: 3 queries, load ~10 objects

Implementation Order

  1. Write new tests for pagination and direct lookup
  2. Implement _build_task_rows_query()
  3. Implement _get_paginated_task_rows()
  4. Implement _load_task_objects()
  5. Implement _task_row_to_task_log()
  6. Refactor get_run_tasks() to use new helpers
  7. Refactor get_run_task() with direct lookup
  8. Update _invocation_to_run_log() to set task_logs=None
  9. Delete _build_task_logs() and _step_to_task_log()
  10. Run all tests - verify pass
  11. Performance test with large workflow

Files Modified

  • lib/galaxy/webapps/galaxy/services/wes.py:
    • Add 4 new helper methods
    • Refactor 2 existing methods (get_run_tasks, get_run_task)
    • Modify _invocation_to_run_log() to set task_logs=None
    • Delete 2 old methods (_build_task_logs, _step_to_task_log)
  • lib/galaxy_test/api/test_workflows.py:
    • Add 6 new test methods

Potential Issues & Solutions

Issue 1: order_index Column Property

WorkflowInvocationStep.order_index is a column_property with scalar subquery (line 10067-10068):

order_index: Mapped[int] = column_property(
    select(WorkflowStep.order_index).where(WorkflowStep.id == workflow_step_id).scalar_subquery()
)

Solution: If this causes issues in the UNION query, explicitly join WorkflowStep in each subquery and use WorkflowStep.order_index directly.

Issue 2: Existing Bug in _step_to_task_log

Line 944 has bug:

jobs = [icj.job for icj in step.implicit_collection_jobs]

Should be:

jobs = [assoc.job for assoc in step.implicit_collection_jobs.jobs]

Solution: Fixed in new implementation by using proper query through ImplicitCollectionJobsJobAssociation.

Success Criteria

  1. ✅ All existing tests pass
  2. GET /runs/{run_id} returns task_logs=None (following spec deprecation)
  3. GET /runs/{run_id}/tasks paginates correctly
  4. GET /runs/{run_id}/tasks/{task_id} returns single task efficiently
  5. ✅ Query count is O(1) for pagination (3 queries regardless of workflow size)
  6. ✅ Memory usage is O(page_size) instead of O(total_tasks)
  7. ✅ Performance improvement measurable for workflows with 100+ tasks
  8. ✅ Spec-compliant (follows WES deprecation guidance)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment