Skip to content

Instantly share code, notes, and snippets.

@jeremysmithco
Last active July 28, 2025 14:12
Show Gist options
  • Select an option

  • Save jeremysmithco/bca2e13eb42dd0c97db4689e3136aaa4 to your computer and use it in GitHub Desktop.

Select an option

Save jeremysmithco/bca2e13eb42dd0c97db4689e3136aaa4 to your computer and use it in GitHub Desktop.
SolidFlow
module SolidFlow
class WorkflowJob < ActiveJob::Base
delegate :run, :in_parallel, to: :workflow
queue_as :default
def perform(*args)
build(*args)
StepExecutor.new(workflow.serialized, 0, batch).execute
end
def serialized(*args)
build(*args)
workflow.serialized
end
private
def workflow
@workflow ||= Workflow.new
end
def build(*args)
raise NotImplementedError, "Subclasses must implement build"
end
end
class CompletionJob < ActiveJob::Base
queue_as :default
def perform(batch, workflow_data, step_index, parent_batch)
StepExecutor.new(workflow_data, step_index, parent_batch).execute
end
end
class Workflow
attr_accessor :serial_steps, :parallel_steps
def initialize
@serial_steps = []
end
def run(*step)
current_steps = parallel_steps || serial_steps
current_steps << step
end
def in_parallel
@parallel_steps = []
yield
serial_steps << @parallel_steps if @parallel_steps.any?
ensure
@parallel_steps = nil
serial_steps
end
def serialized
@serialized ||= serial_steps.map do |step|
case step[0]
when Array
step.map do |(job, *args)|
{"klass" => job.name, "args" => args}
end
when Class
job, *args = step
{"klass" => job.name, "args" => args}
end
end
end
end
class StepExecutor
def initialize(workflow_data, step_index, parent_batch)
@workflow_data = workflow_data
@step_index = step_index
@parent_batch = parent_batch
end
def execute
raise "Empty workflow" if workflow_data.empty?
return if step_index >= workflow_data.length
step_jobs = Array.wrap(workflow_data.at(step_index))
conditionally_within_batch(parent_batch) do
SolidQueue::JobBatch.enqueue(on_success: CompletionJob.new(workflow_data, step_index + 1, parent_batch)) do
step_jobs.each do |job|
Object.const_get(job["klass"]).perform_later(*job["args"])
end
end
end
end
private
attr_reader :workflow_data, :step_index, :parent_batch
def conditionally_within_batch(batch)
if batch
batch.enqueue do
yield
end
else
yield
end
end
end
end
class ParentWorkflowJob < SolidFlow::WorkflowJob
def build(id)
run SetupJob, id
in_parallel do
run EmailJob, id
run EmailJob, id + 1
run EmailJob, id + 2
run ChildWorkflowJob, id
run SlackJob, id
run SlackJob, id + 1
run SlackJob, id + 2
end
run CleanupJob, id
end
end
class ChildWorkflowJob < SolidFlow::WorkflowJob
def build(id)
run SetupJob, id + 1000
in_parallel do
run EmailJob, id + 2000
run SlackJob, id + 2000
end
run CleanupJob, id + 1000
end
end
class SetupJob < ApplicationJob
def perform(id)
sleep 2
Rails.logger.info "[Test Workflow] SetupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
class EmailJob < ApplicationJob
def perform(id)
sleep 4
Rails.logger.info "[Test Workflow] EmailJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
class SlackJob < ApplicationJob
def perform(id)
sleep 2
Rails.logger.info "[Test Workflow] SlackJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
class CleanupJob < ApplicationJob
def perform(id)
Rails.logger.info "[Test Workflow] CleanupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}"
end
end
@kaspth
Copy link

kaspth commented Jul 25, 2025

It's funny I had been wanting to look into job orchestration and planning sorta ala this a few years ago, but I never got that far with it. This did remind me of one experiment I did: https://github.com/kaspth/active_job-inlined

Basically reinterpreting perform_later to actually send a different message (here perform_now). I think that could potentially be interesting for this too. That's essentially what JobBatch.enqueue(on_success:, &) does and why we have to call perform_later within that block.

Anyway, interesting stuff! I played around with the internals a bit and here's where I ended up:

module SolidFlow
  class WorkflowJob < ActiveJob::Base
    queue_as :default

    # What if Workflows just have to respond to perform? Then they could potentially use perform callbacks like usual?
    # …although if that's not the intention maybe workflows shouldn't be jobs at all? Instead, they're solely schedulers for other jobs,
    # which could be processed in a background job, but users don't control that job.
    def perform(...)
      build(...)
    end
    after_perform :start

    private
      def workflow = @workflow ||= Workflow.new
      delegate :run, :in_parallel, to: :workflow

      def start
        workflow.start(batch)
      end
  
      def build(*args)
        raise NotImplementedError, "Subclasses must implement build"
      end
  end

  class Workflow
    def initialize
      @steps = @container = []
    end

    def start(batch)
      raise "Empty workflow" if @steps.empty?
      Queue.new(batch, @steps, 0).next
    end

    def run(job, *args)
      @container << { "class" => job.name, "args" => args }
    end

    def in_parallel
      previous_container, @container = @container, []
      yield
      @steps << @container unless @container.empty?
    ensure
      @container = previous_container
    end

    class Queue < Data.define(:batch, :job_data, :index)
      class Job < ActiveJob::Base
        queue_as :default

        # What's `batch` here versus `parent_batch`? I don't see where `batch` was passed to the Job?
        def perform(batch, parent_batch, job_data, index)
          Workflow::Queue.new(parent_batch, job_data, index).next
        end

        def enqueue_after(within_batch:, &)
          if within_batch
            within_batch.enqueue { SolidQueue::JobBatch.enqueue(on_success: self, &) }
          else
            SolidQueue::JobBatch.enqueue(on_success: self, &)
          end
        end
      end

      def next
        next_step_job.enqueue_after(within_batch: batch) { enqueue_step } if step
      end

      private
        def step = job_data[index]
        def next_step_job = Job.new(batch, job_data, index.succ)

        def enqueue_step = Array(step).each do |data|
          klass, args = data.fetch_values("class", "args")
          klass.constantize.perform_later(*args)
        end
    end
  end
end

@jeremysmithco
Copy link
Author

@kaspth Hey, thanks again for this! :) This gave me several ideas. The on_success: self was quite interesting, and I like the .next on Queue too.

What's batch here versus parent_batch? I don't see where batch was passed to the Job?

Yeah, I still needed to figure this out. In my original implementation, when I set CompletionJob for the on_success callback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.

@kaspth
Copy link

kaspth commented Jul 28, 2025

@kaspth Hey, thanks again for this! :) This gave me several ideas. The on_success: self was quite interesting, and I like the .next on Queue too.

Awesome!

Yeah, I still needed to figure this out. In my original implementation, when I set CompletionJob for the on_success callback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.

Hmm, I find the JobBatch API somewhat intrusive. I'm not sure what I'd want it to look like though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment