Skip to content

Instantly share code, notes, and snippets.

@jeremysmithco
Last active July 28, 2025 14:12
Show Gist options
  • Select an option

  • Save jeremysmithco/bca2e13eb42dd0c97db4689e3136aaa4 to your computer and use it in GitHub Desktop.

Select an option

Save jeremysmithco/bca2e13eb42dd0c97db4689e3136aaa4 to your computer and use it in GitHub Desktop.
SolidFlow
module SolidFlow
class WorkflowJob < ActiveJob::Base
delegate :run, :in_parallel, to: :workflow
queue_as :default
def perform(*args)
build(*args)
StepExecutor.new(workflow.serialized, 0, batch).execute
end
def serialized(*args)
build(*args)
workflow.serialized
end
private
def workflow
@workflow ||= Workflow.new
end
def build(*args)
raise NotImplementedError, "Subclasses must implement build"
end
end
class CompletionJob < ActiveJob::Base
queue_as :default
def perform(batch, workflow_data, step_index, parent_batch)
StepExecutor.new(workflow_data, step_index, parent_batch).execute
end
end
class Workflow
attr_accessor :serial_steps, :parallel_steps
def initialize
@serial_steps = []
end
def run(*step)
current_steps = parallel_steps || serial_steps
current_steps << step
end
def in_parallel
@parallel_steps = []
yield
serial_steps << @parallel_steps if @parallel_steps.any?
ensure
@parallel_steps = nil
serial_steps
end
def serialized
@serialized ||= serial_steps.map do |step|
case step[0]
when Array
step.map do |(job, *args)|
{"klass" => job.name, "args" => args}
end
when Class
job, *args = step
{"klass" => job.name, "args" => args}
end
end
end
end
class StepExecutor
def initialize(workflow_data, step_index, parent_batch)
@workflow_data = workflow_data
@step_index = step_index
@parent_batch = parent_batch
end
def execute
raise "Empty workflow" if workflow_data.empty?
return if step_index >= workflow_data.length
step_jobs = Array.wrap(workflow_data.at(step_index))
conditionally_within_batch(parent_batch) do
SolidQueue::JobBatch.enqueue(on_success: CompletionJob.new(workflow_data, step_index + 1, parent_batch)) do
step_jobs.each do |job|
Object.const_get(job["klass"]).perform_later(*job["args"])
end
end
end
end
private
attr_reader :workflow_data, :step_index, :parent_batch
def conditionally_within_batch(batch)
if batch
batch.enqueue do
yield
end
else
yield
end
end
end
end
class ParentWorkflowJob < SolidFlow::WorkflowJob
def build(id)
run SetupJob, id
in_parallel do
run EmailJob, id
run EmailJob, id + 1
run EmailJob, id + 2
run ChildWorkflowJob, id
run SlackJob, id
run SlackJob, id + 1
run SlackJob, id + 2
end
run CleanupJob, id
end
end
class ChildWorkflowJob < SolidFlow::WorkflowJob
def build(id)
run SetupJob, id + 1000
in_parallel do
run EmailJob, id + 2000
run SlackJob, id + 2000
end
run CleanupJob, id + 1000
end
end
class SetupJob < ApplicationJob
def perform(id)
sleep 2
Rails.logger.info "[Test Workflow] SetupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
class EmailJob < ApplicationJob
def perform(id)
sleep 4
Rails.logger.info "[Test Workflow] EmailJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
class SlackJob < ApplicationJob
def perform(id)
sleep 2
Rails.logger.info "[Test Workflow] SlackJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
class CleanupJob < ApplicationJob
def perform(id)
Rails.logger.info "[Test Workflow] CleanupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
@jeremysmithco
Copy link
Author

@kaspth Hey, thanks again for this! :) This gave me several ideas. The on_success: self was quite interesting, and I like the .next on Queue too.

What's batch here versus parent_batch? I don't see where batch was passed to the Job?

Yeah, I still needed to figure this out. In my original implementation, when I set CompletionJob for the on_success callback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.

@kaspth
Copy link

kaspth commented Jul 28, 2025

@kaspth Hey, thanks again for this! :) This gave me several ideas. The on_success: self was quite interesting, and I like the .next on Queue too.

Awesome!

Yeah, I still needed to figure this out. In my original implementation, when I set CompletionJob for the on_success callback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.

Hmm, I find the JobBatch API somewhat intrusive. I'm not sure what I'd want it to look like though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment