-
-
Save jeremysmithco/bca2e13eb42dd0c97db4689e3136aaa4 to your computer and use it in GitHub Desktop.
| module SolidFlow | |
| class WorkflowJob < ActiveJob::Base | |
| delegate :run, :in_parallel, to: :workflow | |
| queue_as :default | |
| def perform(*args) | |
| build(*args) | |
| StepExecutor.new(workflow.serialized, 0, batch).execute | |
| end | |
| def serialized(*args) | |
| build(*args) | |
| workflow.serialized | |
| end | |
| private | |
| def workflow | |
| @workflow ||= Workflow.new | |
| end | |
| def build(*args) | |
| raise NotImplementedError, "Subclasses must implement build" | |
| end | |
| end | |
| class CompletionJob < ActiveJob::Base | |
| queue_as :default | |
| def perform(batch, workflow_data, step_index, parent_batch) | |
| StepExecutor.new(workflow_data, step_index, parent_batch).execute | |
| end | |
| end | |
| class Workflow | |
| attr_accessor :serial_steps, :parallel_steps | |
| def initialize | |
| @serial_steps = [] | |
| end | |
| def run(*step) | |
| current_steps = parallel_steps || serial_steps | |
| current_steps << step | |
| end | |
| def in_parallel | |
| @parallel_steps = [] | |
| yield | |
| serial_steps << @parallel_steps if @parallel_steps.any? | |
| ensure | |
| @parallel_steps = nil | |
| serial_steps | |
| end | |
| def serialized | |
| @serialized ||= serial_steps.map do |step| | |
| case step[0] | |
| when Array | |
| step.map do |(job, *args)| | |
| {"klass" => job.name, "args" => args} | |
| end | |
| when Class | |
| job, *args = step | |
| {"klass" => job.name, "args" => args} | |
| end | |
| end | |
| end | |
| end | |
| class StepExecutor | |
| def initialize(workflow_data, step_index, parent_batch) | |
| @workflow_data = workflow_data | |
| @step_index = step_index | |
| @parent_batch = parent_batch | |
| end | |
| def execute | |
| raise "Empty workflow" if workflow_data.empty? | |
| return if step_index >= workflow_data.length | |
| step_jobs = Array.wrap(workflow_data.at(step_index)) | |
| conditionally_within_batch(parent_batch) do | |
| SolidQueue::JobBatch.enqueue(on_success: CompletionJob.new(workflow_data, step_index + 1, parent_batch)) do | |
| step_jobs.each do |job| | |
| Object.const_get(job["klass"]).perform_later(*job["args"]) | |
| end | |
| end | |
| end | |
| end | |
| private | |
| attr_reader :workflow_data, :step_index, :parent_batch | |
| def conditionally_within_batch(batch) | |
| if batch | |
| batch.enqueue do | |
| yield | |
| end | |
| else | |
| yield | |
| end | |
| end | |
| end | |
| end |
| class ParentWorkflowJob < SolidFlow::WorkflowJob | |
| def build(id) | |
| run SetupJob, id | |
| in_parallel do | |
| run EmailJob, id | |
| run EmailJob, id + 1 | |
| run EmailJob, id + 2 | |
| run ChildWorkflowJob, id | |
| run SlackJob, id | |
| run SlackJob, id + 1 | |
| run SlackJob, id + 2 | |
| end | |
| run CleanupJob, id | |
| end | |
| end | |
| class ChildWorkflowJob < SolidFlow::WorkflowJob | |
| def build(id) | |
| run SetupJob, id + 1000 | |
| in_parallel do | |
| run EmailJob, id + 2000 | |
| run SlackJob, id + 2000 | |
| end | |
| run CleanupJob, id + 1000 | |
| end | |
| end | |
| class SetupJob < ApplicationJob | |
| def perform(id) | |
| sleep 2 | |
| Rails.logger.info "[Test Workflow] SetupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end | |
| class EmailJob < ApplicationJob | |
| def perform(id) | |
| sleep 4 | |
| Rails.logger.info "[Test Workflow] EmailJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end | |
| class SlackJob < ApplicationJob | |
| def perform(id) | |
| sleep 2 | |
| Rails.logger.info "[Test Workflow] SlackJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end | |
| class CleanupJob < ApplicationJob | |
| def perform(id) | |
| Rails.logger.info "[Test Workflow] CleanupJob (batch #{batch.id}, parent batch #{batch.parent_job_batch_id}) for id: #{id}" | |
| end | |
| end |
@kaspth Hey, thanks again for this! :) This gave me several ideas. The on_success: self was quite interesting, and I like the .next on Queue too.
What's
batchhere versusparent_batch? I don't see wherebatchwas passed to the Job?
Yeah, I still needed to figure this out. In my original implementation, when I set CompletionJob for the on_success callback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.
@kaspth Hey, thanks again for this! :) This gave me several ideas. The
on_success: selfwas quite interesting, and I like the.nextonQueuetoo.
Awesome!
Yeah, I still needed to figure this out. In my original implementation, when I set
CompletionJobfor theon_successcallback, when the job actually ran I go an error where 4 arguments were given but only 3 were expected, and when I looked at it the batch was coming in as an extra argument.
Hmm, I find the JobBatch API somewhat intrusive. I'm not sure what I'd want it to look like though.
It's funny I had been wanting to look into job orchestration and planning sorta ala this a few years ago, but I never got that far with it. This did remind me of one experiment I did: https://github.com/kaspth/active_job-inlined
Basically reinterpreting
perform_laterto actually send a different message (hereperform_now). I think that could potentially be interesting for this too. That's essentially whatJobBatch.enqueue(on_success:, &)does and why we have to callperform_laterwithin that block.Anyway, interesting stuff! I played around with the internals a bit and here's where I ended up: