Last active
February 25, 2026 00:29
-
-
Save Dhravya/0db35e5d377f51cbc208591b3e04e328 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import type { WorkflowParams2 } from "@repo/types" | |
| import * as Effect from "effect/Effect" | |
| import * as Layer from "effect/Layer" | |
| import { BackgroundTaskService } from "../backgroundTasks" | |
| import { EnvironmentService } from "../environment" | |
| import { CloudflareContentExtractionLayer } from "../extraction/service" | |
| import { runWorkflowEffect } from "." | |
| import { | |
| NonRetryableErrorService, | |
| WorkflowStepManager, | |
| WorkflowTriggerService, | |
| } from "./types" | |
| // Direct mode implementation - just fails normally | |
| export const DirectNonRetryableErrorLayer = Layer.succeed( | |
| NonRetryableErrorService, | |
| { | |
| throw: (error: Error) => Effect.fail(error), | |
| }, | |
| ) | |
| /** | |
| * Direct WorkflowStepManager implementation | |
| * Runs steps immediately without any special workflow infrastructure | |
| */ | |
| const makeDirectWorkflowStepManager = (): WorkflowStepManager<unknown> => ({ | |
| do: <A, E>( | |
| name: string, | |
| _config: unknown, | |
| fn: () => Effect.Effect<A, E | never, never>, | |
| ) => | |
| Effect.gen(function* () { | |
| console.log(`[Direct Workflow] Executing step: ${name}`) | |
| // Force the effect to run to completion by converting to promise and back | |
| const result = yield* Effect.promise(async () => { | |
| const stepEffect = fn() | |
| // Run the step effect as a promise to handle async operations | |
| return await Effect.runPromise(stepEffect) | |
| }) | |
| console.log(`[Direct Workflow] Step ${name} completed successfully`) | |
| return result | |
| }), | |
| sleep: (_name: string, duration: number | string) => | |
| Effect.gen(function* () { | |
| const ms = | |
| typeof duration === "number" ? duration : parseDuration(duration) | |
| yield* Effect.log(`[Direct Workflow] Sleeping for ${ms}ms`) | |
| yield* Effect.sleep(ms) | |
| }), | |
| }) | |
| const parseDuration = (duration: string): number => { | |
| const match = duration.match(/^(\d+)\s*(second|minute|hour|day|week)s?$/) | |
| if (!match) return 0 | |
| const [, amount, unit] = match | |
| if (!amount || !unit) { | |
| return 0 | |
| } | |
| const multipliers: Record<string, number> = { | |
| second: 1000, | |
| minute: 60 * 1000, | |
| hour: 60 * 60 * 1000, | |
| day: 24 * 60 * 60 * 1000, | |
| week: 7 * 24 * 60 * 60 * 1000, | |
| } | |
| return Number.parseInt(amount) * (multipliers[unit] || 0) | |
| } | |
| /** | |
| * Direct implementation of WorkflowTriggerService for Node.js/testing | |
| * Uses Effect's built-in retry and scheduling mechanisms instead of Cloudflare Workflows | |
| * Executes workflow logic immediately in the background using BackgroundTaskService | |
| */ | |
| export const makeDirectWorkflowTriggerService = ( | |
| _backgroundTasks: BackgroundTaskService, | |
| envService: EnvironmentService, | |
| ): WorkflowTriggerService => { | |
| const workflows = new Map< | |
| string, | |
| { | |
| id: string | |
| status: "running" | "complete" | "failed" | "queued" | |
| output?: unknown | |
| error?: string | |
| promise: Promise<unknown> | |
| } | |
| >() | |
| return { | |
| create: <_Params, Result = unknown>( | |
| idOrParams: string | WorkflowParams2, | |
| params?: WorkflowParams2, | |
| ) => | |
| Effect.gen(function* () { | |
| // Parse the arguments to handle both signatures | |
| const actualId = | |
| typeof idOrParams === "string" | |
| ? idOrParams | |
| : `workflow-${Date.now()}-${Math.random().toString(36).substring(7)}` | |
| const actualParams = | |
| // biome-ignore lint/style/noNonNullAssertion: params exist | |
| typeof idOrParams === "string" ? params! : idOrParams | |
| // Create a promise that will be resolved when the workflow completes | |
| let resolveWorkflow: (result: unknown) => void | |
| let rejectWorkflow: (error: Error) => void | |
| const workflowPromise = new Promise<Result>((resolve, reject) => { | |
| resolveWorkflow = resolve as (result: unknown) => void | |
| rejectWorkflow = reject | |
| }) | |
| // Store workflow in running state | |
| workflows.set(actualId, { | |
| id: actualId, | |
| status: "running", | |
| promise: workflowPromise, | |
| }) | |
| // Schedule the actual workflow execution in background | |
| // This simulates what would happen in a real Cloudflare Workflow | |
| const workflowEffect = runWorkflowEffect(actualParams).pipe( | |
| Effect.provide( | |
| Layer.mergeAll( | |
| Layer.succeed( | |
| WorkflowStepManager, | |
| makeDirectWorkflowStepManager(), | |
| ), | |
| Layer.succeed(EnvironmentService, envService), | |
| CloudflareContentExtractionLayer(), | |
| DirectNonRetryableErrorLayer, | |
| ), | |
| ), | |
| Effect.tap((result) => | |
| Effect.sync(() => { | |
| const workflow = workflows.get(actualId) | |
| if (workflow) { | |
| workflow.status = "complete" | |
| workflow.output = result | |
| resolveWorkflow(result) | |
| } | |
| }), | |
| ), | |
| Effect.catchAll((error: unknown) => | |
| Effect.sync(() => { | |
| const workflow = workflows.get(actualId) | |
| if (workflow) { | |
| workflow.status = "failed" | |
| workflow.error = | |
| error instanceof Error ? error.message : String(error) | |
| rejectWorkflow( | |
| error instanceof Error ? error : new Error(String(error)), | |
| ) | |
| } | |
| }), | |
| ), | |
| ) | |
| // Run the workflow directly without setImmediate | |
| // Start it immediately but don't await (let it run in background) | |
| Effect.runPromise(workflowEffect) | |
| .then((result) => { | |
| console.log( | |
| "[Direct Workflow] Workflow completed successfully:", | |
| result, | |
| ) | |
| }) | |
| .catch((error) => { | |
| console.error("[Direct Workflow] Workflow failed:", error) | |
| }) | |
| return { | |
| id: actualId, | |
| status: async () => { | |
| const workflow = workflows.get(actualId) | |
| if (!workflow) { | |
| return { | |
| id: actualId, | |
| status: "unknown" as const, | |
| error: "Workflow not found", | |
| } | |
| } | |
| return { | |
| id: workflow.id, | |
| status: workflow.status, | |
| output: workflow.output, | |
| error: workflow.error, | |
| } | |
| }, | |
| wait: async () => { | |
| return (await workflowPromise) as Result | |
| }, | |
| } | |
| }), | |
| createBatch: <_Params, Result = unknown>( | |
| batch: Array<{ params: WorkflowParams2 }>, | |
| ) => | |
| Effect.gen(function* () { | |
| const instances = [] | |
| // Create workflows for each item in the batch | |
| for (const item of batch) { | |
| const actualId = `workflow-${Date.now()}-${Math.random().toString(36).substring(7)}` | |
| const actualParams = item.params | |
| // Create a promise that will be resolved when the workflow completes | |
| let resolveWorkflow: (result: unknown) => void | |
| let rejectWorkflow: (error: Error) => void | |
| const workflowPromise = new Promise<Result>((resolve, reject) => { | |
| resolveWorkflow = resolve as (result: unknown) => void | |
| rejectWorkflow = reject | |
| }) | |
| // Store workflow in running state | |
| workflows.set(actualId, { | |
| id: actualId, | |
| status: "running", | |
| promise: workflowPromise, | |
| }) | |
| // Schedule the actual workflow execution in background | |
| const workflowEffect = runWorkflowEffect(actualParams).pipe( | |
| Effect.provide( | |
| Layer.mergeAll( | |
| Layer.succeed( | |
| WorkflowStepManager, | |
| makeDirectWorkflowStepManager(), | |
| ), | |
| Layer.succeed(EnvironmentService, envService), | |
| CloudflareContentExtractionLayer(), | |
| DirectNonRetryableErrorLayer, | |
| ), | |
| ), | |
| Effect.tap((result) => | |
| Effect.sync(() => { | |
| const workflow = workflows.get(actualId) | |
| if (workflow) { | |
| workflow.status = "complete" | |
| workflow.output = result | |
| resolveWorkflow(result) | |
| } | |
| }), | |
| ), | |
| Effect.catchAll((error: unknown) => | |
| Effect.sync(() => { | |
| const workflow = workflows.get(actualId) | |
| if (workflow) { | |
| workflow.status = "failed" | |
| workflow.error = | |
| error instanceof Error ? error.message : String(error) | |
| rejectWorkflow( | |
| error instanceof Error ? error : new Error(String(error)), | |
| ) | |
| } | |
| }), | |
| ), | |
| ) | |
| // Run the workflow in background | |
| Effect.runPromise(workflowEffect) | |
| .then((result) => { | |
| console.log( | |
| `[Direct Workflow] Batch workflow ${actualId} completed successfully:`, | |
| result, | |
| ) | |
| }) | |
| .catch((error) => { | |
| console.error( | |
| `[Direct Workflow] Batch workflow ${actualId} failed:`, | |
| error, | |
| ) | |
| }) | |
| instances.push({ | |
| id: actualId, | |
| status: async () => { | |
| const workflow = workflows.get(actualId) | |
| if (!workflow) { | |
| return { | |
| id: actualId, | |
| status: "unknown" as const, | |
| error: "Workflow not found", | |
| } | |
| } | |
| return { | |
| id: workflow.id, | |
| status: workflow.status, | |
| output: workflow.output, | |
| error: workflow.error, | |
| } | |
| }, | |
| wait: async () => { | |
| return (await workflowPromise) as Result | |
| }, | |
| }) | |
| } | |
| return instances | |
| }), | |
| get: <Result = unknown>(id: string) => | |
| Effect.sync(() => { | |
| const workflow = workflows.get(id) | |
| if (!workflow) return null | |
| return { | |
| id: workflow.id, | |
| status: async () => ({ | |
| id: workflow.id, | |
| status: workflow.status, | |
| output: workflow.output, | |
| error: workflow.error, | |
| }), | |
| wait: async () => (await workflow.promise) as Result, | |
| } | |
| }), | |
| } | |
| } | |
| /** | |
| * Layer that provides DirectWorkflowTriggerService | |
| * Requires BackgroundTaskService and EnvironmentService to be in context | |
| */ | |
| export const DirectWorkflowTriggerLayer = Layer.effect( | |
| WorkflowTriggerService, | |
| Effect.gen(function* () { | |
| const backgroundTasks = yield* BackgroundTaskService | |
| const envService = yield* EnvironmentService | |
| return makeDirectWorkflowTriggerService(backgroundTasks, envService) | |
| }), | |
| ) | |
| /** | |
| * Self-contained layer that includes NodeJs background tasks and mock environment | |
| * Use this for simple testing without needing to provide any services | |
| */ | |
| export const StandaloneDirectWorkflowTriggerLayer = Layer.provide( | |
| DirectWorkflowTriggerLayer, | |
| Layer.mergeAll( | |
| Layer.succeed(BackgroundTaskService, { | |
| scheduleTask: (effect) => | |
| Effect.sync(() => { | |
| // Execute in background | |
| Effect.runPromise(effect) | |
| .then(() => { | |
| console.log("[Standalone Workflow] Task completed successfully") | |
| }) | |
| .catch((error) => { | |
| console.error("[Standalone Workflow] Task failed:", error) | |
| }) | |
| }), | |
| }), | |
| Layer.succeed(EnvironmentService, { | |
| getEnv: () => process.env as unknown as Env, | |
| }), | |
| ), | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import type { WorkflowParams2 } from "@repo/types" | |
| import * as Context from "effect/Context" | |
| import type * as Effect from "effect/Effect" | |
| // =========================== | |
| // NonRetryableError Service | |
| // =========================== | |
| export interface NonRetryableErrorService { | |
| readonly throw: (error: Error) => Effect.Effect<never, Error> | |
| } | |
| export const NonRetryableErrorService = | |
| Context.GenericTag<NonRetryableErrorService>("NonRetryableErrorService") | |
| // =========================== | |
| // Workflow Step Manager | |
| // =========================== | |
| export interface WorkflowStepManager<Config = unknown> { | |
| readonly do: <A, E = Error>( | |
| name: string, | |
| config: Config, | |
| fn: () => Effect.Effect<A, E, never>, | |
| ) => Effect.Effect<A, E, never> | |
| readonly sleep: ( | |
| name: string, | |
| duration: number | string, | |
| ) => Effect.Effect<void, never, never> | |
| } | |
| export const WorkflowStepManager = Context.GenericTag< | |
| WorkflowStepManager<unknown> | |
| >("WorkflowStepManager") | |
| // =========================== | |
| // Workflow Trigger Service | |
| // =========================== | |
| // Define what a workflow instance looks like | |
| export interface WorkflowInstance<Result = unknown> { | |
| readonly id: string | |
| readonly status: () => Promise<WorkflowStatus> | |
| readonly wait: () => Promise<Result> | |
| } | |
| export interface WorkflowStatus { | |
| readonly id: string | |
| readonly status: "running" | "complete" | "failed" | "queued" | "unknown" | |
| readonly error?: string | |
| readonly output?: unknown | |
| } | |
| // Service interface for workflow triggers | |
| export interface WorkflowTriggerService { | |
| readonly create: <_Params extends WorkflowParams2, Result = unknown>( | |
| idOrParams: string | WorkflowParams2, | |
| params?: WorkflowParams2, | |
| ) => Effect.Effect<WorkflowInstance<Result>, never, never> | |
| readonly createBatch: <Params extends WorkflowParams2, Result = unknown>( | |
| batch: Array<{ params: Params }>, | |
| ) => Effect.Effect<Array<WorkflowInstance<Result>>, never, never> | |
| readonly get: <Result = unknown>( | |
| id: string, | |
| ) => Effect.Effect<WorkflowInstance<Result> | null, never, never> | |
| } | |
| // Create the service tag | |
| export const WorkflowTriggerService = | |
| Context.GenericTag<WorkflowTriggerService>("WorkflowTriggerService") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment