Last active
July 28, 2025 14:12
-
-
Save jeremysmithco/bca2e13eb42dd0c97db4689e3136aaa4 to your computer and use it in GitHub Desktop.
SolidFlow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module SolidFlow | |
| class WorkflowJob < ActiveJob::Base | |
| delegate :run, :in_parallel, to: :workflow | |
| queue_as :default | |
| def perform(*args) | |
| build(*args) | |
| StepExecutor.new(workflow.serialized, 0, batch).execute | |
| end | |
| def serialized(*args) | |
| build(*args) | |
| workflow.serialized | |
| end | |
| private | |
| def workflow | |
| @workflow ||= Workflow.new | |
| end | |
| def build(*args) | |
| raise NotImplementedError, "Subclasses must implement build" | |
| end | |
| end | |
| class CompletionJob < ActiveJob::Base | |
| queue_as :default | |
| def perform(batch, workflow_data, step_index, parent_batch) | |
| StepExecutor.new(workflow_data, step_index, parent_batch).execute | |
| end | |
| end | |
| class Workflow | |
| attr_accessor :serial_steps, :parallel_steps | |
| def initialize | |
| @serial_steps = [] | |
| end | |
| def run(*step) | |
| current_steps = parallel_steps || serial_steps | |
| current_steps << step | |
| end | |
| def in_parallel | |
| @parallel_steps = [] | |
| yield | |
| serial_steps << @parallel_steps if @parallel_steps.any? | |
| ensure | |
| @parallel_steps = nil | |
| serial_steps | |
| end | |
| def serialized | |
| @serialized ||= serial_steps.map do |step| | |
| case step[0] | |
| when Array | |
| step.map do |(job, *args)| | |
| {"klass" => job.name, "args" => args} | |
| end | |
| when Class | |
| job, *args = step | |
| {"klass" => job.name, "args" => args} | |
| end | |
| end | |
| end | |
| end | |
| class StepExecutor | |
| def initialize(workflow_data, step_index, parent_batch) | |
| @workflow_data = workflow_data | |
| @step_index = step_index | |
| @parent_batch = parent_batch | |
| end | |
| def execute | |
| raise "Empty workflow" if workflow_data.empty? | |
| return if step_index >= workflow_data.length | |
| step_jobs = Array.wrap(workflow_data.at(step_index)) | |
| conditionally_within_batch(parent_batch) do | |
| SolidQueue::JobBatch.enqueue(on_success: CompletionJob.new(workflow_data, step_index + 1, parent_batch)) do | |
| step_jobs.each do |job| | |
| Object.const_get(job["klass"]).perform_later(*job["args"]) | |
| end | |
| end | |
| end | |
| end | |
| private | |
| attr_reader :workflow_data, :step_index, :parent_batch | |
| def conditionally_within_batch(batch) | |
| if batch | |
| batch.enqueue do | |
| yield | |
| end | |
| else | |
| yield | |
| end | |
| end | |
| end | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class ParentWorkflowJob < SolidFlow::WorkflowJob | |
| def build(id) | |
| run SetupJob, id | |
| in_parallel do | |
| run EmailJob, id | |
| run EmailJob, id + 1 | |
| run EmailJob, id + 2 | |
| run ChildWorkflowJob, id | |
| run SlackJob, id | |
| run SlackJob, id + 1 | |
| run SlackJob, id + 2 | |
| end | |
| run CleanupJob, id | |
| end | |
| end | |
| class ChildWorkflowJob < SolidFlow::WorkflowJob | |
| def build(id) | |
| run SetupJob, id + 1000 | |
| in_parallel do | |
| run EmailJob, id + 2000 | |
| run SlackJob, id + 2000 | |
| end | |
| run CleanupJob, id + 1000 | |
| end | |
| end | |
| class SetupJob < ApplicationJob | |
| def perform(id) | |
| sleep 2 | |
| Rails.logger.info "[Test Workflow] SetupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end | |
| class EmailJob < ApplicationJob | |
| def perform(id) | |
| sleep 4 | |
| Rails.logger.info "[Test Workflow] EmailJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end | |
| class SlackJob < ApplicationJob | |
| def perform(id) | |
| sleep 2 | |
| Rails.logger.info "[Test Workflow] SlackJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end | |
| class CleanupJob < ApplicationJob | |
| def perform(id) | |
| Rails.logger.info "[Test Workflow] CleanupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end |
Author
@kaspth Hey, thanks again for this! :) This gave me several ideas. The
on_success: selfwas quite interesting, and I like the.nextonQueuetoo.
Awesome!
Yeah, I still needed to figure this out. In my original implementation, when I set
CompletionJobfor theon_successcallback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.
Hmm, I find the JobBatch API somewhat intrusive. I'm not sure what I'd want it to look like though.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@kaspth Hey, thanks again for this! :) This gave me several ideas. The
on_success: selfwas quite interesting, and I like the.nextonQueuetoo.Yeah, I still needed to figure this out. In my original implementation, when I set
CompletionJobfor theon_successcallback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.