Current implementation in wes.py:
_build_task_logs()(lines 886-913): Loads ALL WorkflowInvocationSteps into memory, then all related Jobs- Used in 3 places:
get_run_tasks()(line 701) - loads all, slices in Pythonget_run_task()(line 742) - loads all to find one_invocation_to_run_log()(line 868) - populates deprecated field
This is inefficient for large workflows with hundreds of steps or collection jobs with thousands of jobs.
From lib/galaxy/schema/wes/__init__.py lines 244-246:
task_logs: Optional[List[Union[Log, TaskLog]]] = Field(
None,
description="... This field is deprecated and the `task_logs_url` should be used
to retrieve a paginated list of steps from the workflow run. This field will be
removed in the next major version of the specification (2.0.0)",
)Conclusion: We can and should set task_logs=None in RunLog responses and rely entirely on task_logs_url for pagination. This is the spec-recommended approach.
Remove _build_task_logs() entirely and implement database-level pagination:
get_run_tasks()- Paginated query with UNION for all task typesget_run_task()- Direct lookup by parsing task_id (no full scan)_invocation_to_run_log()- Settask_logs=None(already hastask_logs_url)
Benefits:
- Only fetch needed objects (O(page_size) instead of O(total_tasks))
- Minimize memory usage
- Minimize database queries (batch loading)
- Spec-compliant (follow deprecation guidance)
From lib/galaxy/model/__init__.py:
class WorkflowInvocationStep:
id: int (primary key)
workflow_invocation_id: int (FK)
job_id: Optional[int] (FK to Job) - for single jobs
implicit_collection_jobs_id: Optional[int] (FK) - for collection jobs
order_index: int (column_property from WorkflowStep)
workflow_step: WorkflowStep (relationship)
class Job:
id: int (primary key)
create_time, update_time, command_line, exit_code: various fields
class ImplicitCollectionJobs:
id: int (primary key)
jobs: List[ImplicitCollectionJobsJobAssociation] (relationship)
class ImplicitCollectionJobsJobAssociation:
id: int (primary key)
implicit_collection_jobs_id: int (FK)
job_id: int (FK to Job)
order_index: int - ordering within the collection
job: Job (relationship)Each WorkflowInvocationStep generates 1 or more TaskLog entries with specific ID format:
-
Single job step (
job_idnot NULL):- Task ID:
"{step_id}"(e.g.,"123")
- Task ID:
-
Collection jobs step (
implicit_collection_jobs_idnot NULL):- Task IDs:
"{step_id}.{job_index}"(e.g.,"123.0","123.1","123.2") - Ordered by
ImplicitCollectionJobsJobAssociation.order_index
- Task IDs:
-
No job step (both NULL):
- Task ID:
"{step_id}"(e.g.,"456")
- Task ID:
Use UNION ALL to create virtual table of all task rows, then paginate:
-- Part 1: Steps with single jobs
SELECT
wis.id as step_id,
wis.order_index as step_order,
'single' as task_type,
wis.job_id as job_id,
0 as job_index
FROM workflow_invocation_step wis
WHERE wis.workflow_invocation_id = :invocation_id
AND wis.job_id IS NOT NULL
UNION ALL
-- Part 2: Steps with collection jobs (expanded per job)
SELECT
wis.id as step_id,
wis.order_index as step_order,
'collection' as task_type,
icjja.job_id as job_id,
icjja.order_index as job_index
FROM workflow_invocation_step wis
JOIN implicit_collection_jobs icj
ON wis.implicit_collection_jobs_id = icj.id
JOIN implicit_collection_jobs_job_association icjja
ON icj.id = icjja.implicit_collection_jobs_id
WHERE wis.workflow_invocation_id = :invocation_id
AND wis.implicit_collection_jobs_id IS NOT NULL
UNION ALL
-- Part 3: Steps with no jobs
SELECT
wis.id as step_id,
wis.order_index as step_order,
'no_job' as task_type,
NULL as job_id,
0 as job_index
FROM workflow_invocation_step wis
WHERE wis.workflow_invocation_id = :invocation_id
AND wis.job_id IS NULL
AND wis.implicit_collection_jobs_id IS NULL
ORDER BY step_order, job_index
LIMIT :limit OFFSET :offsetMethod: _build_task_rows_query(invocation_id: int) -> Select
Returns SQLAlchemy Select that produces columns:
step_id: intstep_order: inttask_type: Literal['single', 'collection', 'no_job']job_id: Optional[int]job_index: int
def _build_task_rows_query(self, invocation_id: int) -> Select:
"""Build UNION query for all task rows in an invocation."""
# Subquery 1: Single job steps
single_jobs = (
select(
WorkflowInvocationStep.id.label('step_id'),
WorkflowInvocationStep.order_index.label('step_order'),
literal('single').label('task_type'),
WorkflowInvocationStep.job_id.label('job_id'),
literal(0).label('job_index'),
)
.where(
WorkflowInvocationStep.workflow_invocation_id == invocation_id,
WorkflowInvocationStep.job_id.isnot(None),
)
)
# Subquery 2: Collection job steps (expanded)
collection_jobs = (
select(
WorkflowInvocationStep.id.label('step_id'),
WorkflowInvocationStep.order_index.label('step_order'),
literal('collection').label('task_type'),
ImplicitCollectionJobsJobAssociation.job_id.label('job_id'),
ImplicitCollectionJobsJobAssociation.order_index.label('job_index'),
)
.join(
ImplicitCollectionJobs,
WorkflowInvocationStep.implicit_collection_jobs_id == ImplicitCollectionJobs.id
)
.join(
ImplicitCollectionJobsJobAssociation,
ImplicitCollectionJobs.id == ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id
)
.where(
WorkflowInvocationStep.workflow_invocation_id == invocation_id,
WorkflowInvocationStep.implicit_collection_jobs_id.isnot(None),
)
)
# Subquery 3: No-job steps
no_jobs = (
select(
WorkflowInvocationStep.id.label('step_id'),
WorkflowInvocationStep.order_index.label('step_order'),
literal('no_job').label('task_type'),
literal(None).label('job_id'),
literal(0).label('job_index'),
)
.where(
WorkflowInvocationStep.workflow_invocation_id == invocation_id,
WorkflowInvocationStep.job_id.is_(None),
WorkflowInvocationStep.implicit_collection_jobs_id.is_(None),
)
)
return union_all(single_jobs, collection_jobs, no_jobs)Note on order_index: WorkflowInvocationStep.order_index is a column_property with scalar subquery. May need to join WorkflowStep explicitly if this causes issues. Test first.
Method: _get_paginated_task_rows(trans, invocation_id, offset, limit) -> List[dict]
Executes the UNION query with ordering and pagination.
def _get_paginated_task_rows(
self,
trans: ProvidesUserContext,
invocation_id: int,
offset: int,
limit: int,
) -> List[dict]:
"""Fetch paginated task rows from database.
Returns list of dicts with keys: step_id, step_order, task_type, job_id, job_index
"""
# Build UNION subquery
task_rows_subquery = self._build_task_rows_query(invocation_id).subquery()
# Apply ordering and pagination
stmt = (
select(
task_rows_subquery.c.step_id,
task_rows_subquery.c.step_order,
task_rows_subquery.c.task_type,
task_rows_subquery.c.job_id,
task_rows_subquery.c.job_index,
)
.order_by(
task_rows_subquery.c.step_order,
task_rows_subquery.c.job_index,
)
.offset(offset)
.limit(limit + 1) # Fetch one extra to detect more results
)
result = trans.sa_session.execute(stmt)
return [dict(row._mapping) for row in result]Method: _load_task_objects(trans, task_rows) -> tuple[dict, dict]
Given task rows, batch load all needed WorkflowInvocationSteps and Jobs.
def _load_task_objects(
self,
trans: ProvidesUserContext,
task_rows: List[dict],
) -> tuple[dict[int, WorkflowInvocationStep], dict[int, Job]]:
"""Batch load Step and Job objects for task rows.
Returns:
- steps_by_id: {step_id: WorkflowInvocationStep}
- jobs_by_id: {job_id: Job}
"""
if not task_rows:
return {}, {}
# Extract unique IDs
step_ids = list(set(row['step_id'] for row in task_rows))
job_ids = list(set(row['job_id'] for row in task_rows if row['job_id'] is not None))
# Batch load steps with workflow_step relationship
steps = (
trans.sa_session.query(WorkflowInvocationStep)
.options(joinedload(WorkflowInvocationStep.workflow_step))
.filter(WorkflowInvocationStep.id.in_(step_ids))
.all()
)
steps_by_id = {step.id: step for step in steps}
# Batch load jobs
jobs_by_id = {}
if job_ids:
jobs = (
trans.sa_session.query(Job)
.filter(Job.id.in_(job_ids))
.all()
)
jobs_by_id = {job.id: job for job in jobs}
return steps_by_id, jobs_by_idMethod: _task_row_to_task_log(task_row, steps_by_id, jobs_by_id) -> TaskLog
def _task_row_to_task_log(
self,
task_row: dict,
steps_by_id: dict[int, WorkflowInvocationStep],
jobs_by_id: dict[int, Job],
) -> TaskLog:
"""Convert a task row dict to a TaskLog object."""
step_id = task_row['step_id']
job_id = task_row['job_id']
job_index = task_row['job_index']
task_type = task_row['task_type']
step = steps_by_id[step_id]
workflow_step = step.workflow_step
# Generate task ID
if task_type == 'collection':
task_id = f"{step_id}.{job_index}"
else:
task_id = str(step_id)
# Get step name
step_name = workflow_step.label or workflow_step.tool_id or f"step_{step.order_index}"
# Build TaskLog with or without job details
if job_id is not None:
job = jobs_by_id[job_id]
return TaskLog(
id=task_id,
name=step_name,
start_time=job.create_time.isoformat() if job.create_time else None,
end_time=job.update_time.isoformat() if job.update_time else None,
stdout=f"/api/jobs/{self._security.encode_id(job.id)}/stdout",
stderr=f"/api/jobs/{self._security.encode_id(job.id)}/stderr",
exit_code=job.exit_code,
)
else:
# No job - use step-level timing
return TaskLog(
id=task_id,
name=step_name,
start_time=step.create_time.isoformat() if step.create_time else None,
end_time=step.update_time.isoformat() if step.update_time else None,
)Replace lines 680-718 with efficient paginated implementation:
def get_run_tasks(
self,
trans: ProvidesUserContext,
run_id: int,
page_size: int = 10,
page_token: Optional[str] = None,
) -> TaskListResponse:
"""Get paginated list of tasks for a workflow run.
Uses database-level pagination via UNION query to avoid loading
all steps/jobs into memory.
"""
invocation = self._get_invocation(trans, run_id)
# Decode page token to offset
offset = _decode_page_token(page_token)
# Fetch paginated task rows (+1 to detect more results)
task_rows = self._get_paginated_task_rows(
trans,
invocation.id,
offset,
page_size,
)
# Check if more results exist
has_more = len(task_rows) > page_size
if has_more:
task_rows = task_rows[:page_size]
# Batch load all needed objects
steps_by_id, jobs_by_id = self._load_task_objects(trans, task_rows)
# Convert to TaskLog objects
task_logs = [
self._task_row_to_task_log(row, steps_by_id, jobs_by_id)
for row in task_rows
]
# Generate next page token
next_page_token = None
if has_more:
next_page_token = _encode_page_token(offset + page_size)
return TaskListResponse(
task_logs=task_logs if task_logs else None,
next_page_token=next_page_token,
)Replace lines 720-750 with efficient direct lookup:
def get_run_task(
self,
trans: ProvidesUserContext,
run_id: int,
task_id: str,
) -> TaskLog:
"""Get details for a specific task by direct lookup.
Parses task_id to extract step_id and optional job_index,
then directly queries for that specific step/job.
"""
invocation = self._get_invocation(trans, run_id)
# Parse task_id: either "{step_id}" or "{step_id}.{job_index}"
parts = task_id.split('.')
try:
step_id = int(parts[0])
job_index = int(parts[1]) if len(parts) > 1 else None
except (ValueError, IndexError):
raise exceptions.ObjectNotFound(f"Invalid task_id format: {task_id}")
# Fetch the specific step
step = (
trans.sa_session.query(WorkflowInvocationStep)
.options(joinedload(WorkflowInvocationStep.workflow_step))
.filter(
WorkflowInvocationStep.id == step_id,
WorkflowInvocationStep.workflow_invocation_id == invocation.id,
)
.one_or_none()
)
if not step:
raise exceptions.ObjectNotFound(f"Task {task_id} not found in run {run_id}")
# Get step name
workflow_step = step.workflow_step
step_name = workflow_step.label or workflow_step.tool_id or f"step_{step.order_index}"
# Handle different step types
if step.job_id:
# Single job step
if job_index is not None:
raise exceptions.ObjectNotFound(
f"Task {task_id} specifies job_index but step has single job"
)
job = trans.sa_session.query(Job).filter(Job.id == step.job_id).one()
return TaskLog(
id=str(step_id),
name=step_name,
start_time=job.create_time.isoformat() if job.create_time else None,
end_time=job.update_time.isoformat() if job.update_time else None,
stdout=f"/api/jobs/{self._security.encode_id(job.id)}/stdout",
stderr=f"/api/jobs/{self._security.encode_id(job.id)}/stderr",
exit_code=job.exit_code,
)
elif step.implicit_collection_jobs_id:
# Collection jobs step
if job_index is None:
raise exceptions.ObjectNotFound(
f"Task {task_id} missing job_index for collection job step"
)
# Fetch specific job from collection by order_index
job_assoc = (
trans.sa_session.query(ImplicitCollectionJobsJobAssociation)
.filter(
ImplicitCollectionJobsJobAssociation.implicit_collection_jobs_id == step.implicit_collection_jobs_id,
ImplicitCollectionJobsJobAssociation.order_index == job_index,
)
.one_or_none()
)
if not job_assoc:
raise exceptions.ObjectNotFound(f"Task {task_id} job not found at index {job_index}")
job = trans.sa_session.query(Job).filter(Job.id == job_assoc.job_id).one()
return TaskLog(
id=task_id,
name=step_name,
start_time=job.create_time.isoformat() if job.create_time else None,
end_time=job.update_time.isoformat() if job.update_time else None,
stdout=f"/api/jobs/{self._security.encode_id(job.id)}/stdout",
stderr=f"/api/jobs/{self._security.encode_id(job.id)}/stderr",
exit_code=job.exit_code,
)
else:
# No job step
if job_index is not None:
raise exceptions.ObjectNotFound(
f"Task {task_id} specifies job_index but step has no jobs"
)
return TaskLog(
id=str(step_id),
name=step_name,
start_time=step.create_time.isoformat() if step.create_time else None,
end_time=step.update_time.isoformat() if step.update_time else None,
)Change line 868 from:
task_logs = self._build_task_logs(trans, invocation)To:
# task_logs field is deprecated per WES spec - use task_logs_url instead
task_logs = NoneAnd on line 881, keep:
task_logs=task_logs if task_logs else None, # Will always be None nowOr simplify to:
task_logs=None, # Deprecated field - use task_logs_urlRemove methods entirely (lines 886-976):
_build_task_logs()_step_to_task_log()
These are replaced by the new paginated and direct-lookup approaches.
Add to imports section:
from sqlalchemy import (
# existing imports...
union_all,
literal,
)
from sqlalchemy.orm import joinedloadAll existing tests should continue to pass since API contracts unchanged:
- GET /runs/{run_id}: Returns RunLog with
task_logs=Noneandtask_logs_urlpopulated - GET /runs/{run_id}/tasks: Returns paginated TaskListResponse
- GET /runs/{run_id}/tasks/{task_id}: Returns single TaskLog
Additional test cases to add:
def test_wes_task_pagination_large_workflow():
"""Test pagination with 100+ step workflow."""
# Submit large workflow, verify pagination works correctly
def test_wes_task_direct_lookup_single_job():
"""Test get_run_task() for step with single job."""
# Verify task_id="{step_id}" returns correct TaskLog
def test_wes_task_direct_lookup_collection_jobs():
"""Test get_run_task() for collection job step."""
# Verify task_id="{step_id}.{job_index}" returns correct TaskLog
def test_wes_task_direct_lookup_no_job():
"""Test get_run_task() for step without job."""
# Verify minimal TaskLog returned
def test_wes_task_direct_lookup_invalid_format():
"""Test get_run_task() with malformed task_id."""
# Verify 404 error for invalid task_id
def test_wes_run_log_no_inline_tasks():
"""Test GET /runs/{run_id} returns task_logs=None."""
# Verify deprecated field is None, task_logs_url presentBefore (current implementation):
- Query 1: Load all WorkflowInvocationSteps for invocation
- Query 2-N: Load each step's job (N+1 query problem)
- Query N+1-M: Load each collection's jobs
- Memory: O(total_tasks) - all tasks in memory
After (with pagination):
- Query 1: UNION query with LIMIT/OFFSET (returns page_size task rows)
- Query 2: Batch load steps (single query with IN clause)
- Query 3: Batch load jobs (single query with IN clause)
- Memory: O(page_size) - only current page in memory
For workflow with 1000 steps, fetching page of 10:
- Before: ~1001+ queries, load 1000+ objects
- After: 3 queries, load ~10 objects
- Write new tests for pagination and direct lookup
- Implement
_build_task_rows_query() - Implement
_get_paginated_task_rows() - Implement
_load_task_objects() - Implement
_task_row_to_task_log() - Refactor
get_run_tasks()to use new helpers - Refactor
get_run_task()with direct lookup - Update
_invocation_to_run_log()to settask_logs=None - Delete
_build_task_logs()and_step_to_task_log() - Run all tests - verify pass
- Performance test with large workflow
lib/galaxy/webapps/galaxy/services/wes.py:- Add 4 new helper methods
- Refactor 2 existing methods (
get_run_tasks,get_run_task) - Modify
_invocation_to_run_log()to settask_logs=None - Delete 2 old methods (
_build_task_logs,_step_to_task_log)
lib/galaxy_test/api/test_workflows.py:- Add 6 new test methods
WorkflowInvocationStep.order_index is a column_property with scalar subquery (line 10067-10068):
order_index: Mapped[int] = column_property(
select(WorkflowStep.order_index).where(WorkflowStep.id == workflow_step_id).scalar_subquery()
)Solution: If this causes issues in the UNION query, explicitly join WorkflowStep in each subquery and use WorkflowStep.order_index directly.
Line 944 has bug:
jobs = [icj.job for icj in step.implicit_collection_jobs]Should be:
jobs = [assoc.job for assoc in step.implicit_collection_jobs.jobs]Solution: Fixed in new implementation by using proper query through ImplicitCollectionJobsJobAssociation.
- ✅ All existing tests pass
- ✅
GET /runs/{run_id}returnstask_logs=None(following spec deprecation) - ✅
GET /runs/{run_id}/taskspaginates correctly - ✅
GET /runs/{run_id}/tasks/{task_id}returns single task efficiently - ✅ Query count is O(1) for pagination (3 queries regardless of workflow size)
- ✅ Memory usage is O(page_size) instead of O(total_tasks)
- ✅ Performance improvement measurable for workflows with 100+ tasks
- ✅ Spec-compliant (follows WES deprecation guidance)