Skip to content

Instantly share code, notes, and snippets.

@Dhravya
Last active February 25, 2026 00:29
Show Gist options
  • Select an option

  • Save Dhravya/0db35e5d377f51cbc208591b3e04e328 to your computer and use it in GitHub Desktop.

Select an option

Save Dhravya/0db35e5d377f51cbc208591b3e04e328 to your computer and use it in GitHub Desktop.
import type { WorkflowParams2 } from "@repo/types"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import { BackgroundTaskService } from "../backgroundTasks"
import { EnvironmentService } from "../environment"
import { CloudflareContentExtractionLayer } from "../extraction/service"
import { runWorkflowEffect } from "."
import {
NonRetryableErrorService,
WorkflowStepManager,
WorkflowTriggerService,
} from "./types"
// Direct mode implementation - just fails normally
export const DirectNonRetryableErrorLayer = Layer.succeed(
NonRetryableErrorService,
{
throw: (error: Error) => Effect.fail(error),
},
)
/**
* Direct WorkflowStepManager implementation
* Runs steps immediately without any special workflow infrastructure
*/
const makeDirectWorkflowStepManager = (): WorkflowStepManager<unknown> => ({
do: <A, E>(
name: string,
_config: unknown,
fn: () => Effect.Effect<A, E | never, never>,
) =>
Effect.gen(function* () {
console.log(`[Direct Workflow] Executing step: ${name}`)
// Force the effect to run to completion by converting to promise and back
const result = yield* Effect.promise(async () => {
const stepEffect = fn()
// Run the step effect as a promise to handle async operations
return await Effect.runPromise(stepEffect)
})
console.log(`[Direct Workflow] Step ${name} completed successfully`)
return result
}),
sleep: (_name: string, duration: number | string) =>
Effect.gen(function* () {
const ms =
typeof duration === "number" ? duration : parseDuration(duration)
yield* Effect.log(`[Direct Workflow] Sleeping for ${ms}ms`)
yield* Effect.sleep(ms)
}),
})
const parseDuration = (duration: string): number => {
const match = duration.match(/^(\d+)\s*(second|minute|hour|day|week)s?$/)
if (!match) return 0
const [, amount, unit] = match
if (!amount || !unit) {
return 0
}
const multipliers: Record<string, number> = {
second: 1000,
minute: 60 * 1000,
hour: 60 * 60 * 1000,
day: 24 * 60 * 60 * 1000,
week: 7 * 24 * 60 * 60 * 1000,
}
return Number.parseInt(amount) * (multipliers[unit] || 0)
}
/**
* Direct implementation of WorkflowTriggerService for Node.js/testing
* Uses Effect's built-in retry and scheduling mechanisms instead of Cloudflare Workflows
* Executes workflow logic immediately in the background using BackgroundTaskService
*/
export const makeDirectWorkflowTriggerService = (
_backgroundTasks: BackgroundTaskService,
envService: EnvironmentService,
): WorkflowTriggerService => {
const workflows = new Map<
string,
{
id: string
status: "running" | "complete" | "failed" | "queued"
output?: unknown
error?: string
promise: Promise<unknown>
}
>()
return {
create: <_Params, Result = unknown>(
idOrParams: string | WorkflowParams2,
params?: WorkflowParams2,
) =>
Effect.gen(function* () {
// Parse the arguments to handle both signatures
const actualId =
typeof idOrParams === "string"
? idOrParams
: `workflow-${Date.now()}-${Math.random().toString(36).substring(7)}`
const actualParams =
// biome-ignore lint/style/noNonNullAssertion: params exist
typeof idOrParams === "string" ? params! : idOrParams
// Create a promise that will be resolved when the workflow completes
let resolveWorkflow: (result: unknown) => void
let rejectWorkflow: (error: Error) => void
const workflowPromise = new Promise<Result>((resolve, reject) => {
resolveWorkflow = resolve as (result: unknown) => void
rejectWorkflow = reject
})
// Store workflow in running state
workflows.set(actualId, {
id: actualId,
status: "running",
promise: workflowPromise,
})
// Schedule the actual workflow execution in background
// This simulates what would happen in a real Cloudflare Workflow
const workflowEffect = runWorkflowEffect(actualParams).pipe(
Effect.provide(
Layer.mergeAll(
Layer.succeed(
WorkflowStepManager,
makeDirectWorkflowStepManager(),
),
Layer.succeed(EnvironmentService, envService),
CloudflareContentExtractionLayer(),
DirectNonRetryableErrorLayer,
),
),
Effect.tap((result) =>
Effect.sync(() => {
const workflow = workflows.get(actualId)
if (workflow) {
workflow.status = "complete"
workflow.output = result
resolveWorkflow(result)
}
}),
),
Effect.catchAll((error: unknown) =>
Effect.sync(() => {
const workflow = workflows.get(actualId)
if (workflow) {
workflow.status = "failed"
workflow.error =
error instanceof Error ? error.message : String(error)
rejectWorkflow(
error instanceof Error ? error : new Error(String(error)),
)
}
}),
),
)
// Run the workflow directly without setImmediate
// Start it immediately but don't await (let it run in background)
Effect.runPromise(workflowEffect)
.then((result) => {
console.log(
"[Direct Workflow] Workflow completed successfully:",
result,
)
})
.catch((error) => {
console.error("[Direct Workflow] Workflow failed:", error)
})
return {
id: actualId,
status: async () => {
const workflow = workflows.get(actualId)
if (!workflow) {
return {
id: actualId,
status: "unknown" as const,
error: "Workflow not found",
}
}
return {
id: workflow.id,
status: workflow.status,
output: workflow.output,
error: workflow.error,
}
},
wait: async () => {
return (await workflowPromise) as Result
},
}
}),
createBatch: <_Params, Result = unknown>(
batch: Array<{ params: WorkflowParams2 }>,
) =>
Effect.gen(function* () {
const instances = []
// Create workflows for each item in the batch
for (const item of batch) {
const actualId = `workflow-${Date.now()}-${Math.random().toString(36).substring(7)}`
const actualParams = item.params
// Create a promise that will be resolved when the workflow completes
let resolveWorkflow: (result: unknown) => void
let rejectWorkflow: (error: Error) => void
const workflowPromise = new Promise<Result>((resolve, reject) => {
resolveWorkflow = resolve as (result: unknown) => void
rejectWorkflow = reject
})
// Store workflow in running state
workflows.set(actualId, {
id: actualId,
status: "running",
promise: workflowPromise,
})
// Schedule the actual workflow execution in background
const workflowEffect = runWorkflowEffect(actualParams).pipe(
Effect.provide(
Layer.mergeAll(
Layer.succeed(
WorkflowStepManager,
makeDirectWorkflowStepManager(),
),
Layer.succeed(EnvironmentService, envService),
CloudflareContentExtractionLayer(),
DirectNonRetryableErrorLayer,
),
),
Effect.tap((result) =>
Effect.sync(() => {
const workflow = workflows.get(actualId)
if (workflow) {
workflow.status = "complete"
workflow.output = result
resolveWorkflow(result)
}
}),
),
Effect.catchAll((error: unknown) =>
Effect.sync(() => {
const workflow = workflows.get(actualId)
if (workflow) {
workflow.status = "failed"
workflow.error =
error instanceof Error ? error.message : String(error)
rejectWorkflow(
error instanceof Error ? error : new Error(String(error)),
)
}
}),
),
)
// Run the workflow in background
Effect.runPromise(workflowEffect)
.then((result) => {
console.log(
`[Direct Workflow] Batch workflow ${actualId} completed successfully:`,
result,
)
})
.catch((error) => {
console.error(
`[Direct Workflow] Batch workflow ${actualId} failed:`,
error,
)
})
instances.push({
id: actualId,
status: async () => {
const workflow = workflows.get(actualId)
if (!workflow) {
return {
id: actualId,
status: "unknown" as const,
error: "Workflow not found",
}
}
return {
id: workflow.id,
status: workflow.status,
output: workflow.output,
error: workflow.error,
}
},
wait: async () => {
return (await workflowPromise) as Result
},
})
}
return instances
}),
get: <Result = unknown>(id: string) =>
Effect.sync(() => {
const workflow = workflows.get(id)
if (!workflow) return null
return {
id: workflow.id,
status: async () => ({
id: workflow.id,
status: workflow.status,
output: workflow.output,
error: workflow.error,
}),
wait: async () => (await workflow.promise) as Result,
}
}),
}
}
/**
* Layer that provides DirectWorkflowTriggerService
* Requires BackgroundTaskService and EnvironmentService to be in context
*/
export const DirectWorkflowTriggerLayer = Layer.effect(
WorkflowTriggerService,
Effect.gen(function* () {
const backgroundTasks = yield* BackgroundTaskService
const envService = yield* EnvironmentService
return makeDirectWorkflowTriggerService(backgroundTasks, envService)
}),
)
/**
* Self-contained layer that includes NodeJs background tasks and mock environment
* Use this for simple testing without needing to provide any services
*/
export const StandaloneDirectWorkflowTriggerLayer = Layer.provide(
DirectWorkflowTriggerLayer,
Layer.mergeAll(
Layer.succeed(BackgroundTaskService, {
scheduleTask: (effect) =>
Effect.sync(() => {
// Execute in background
Effect.runPromise(effect)
.then(() => {
console.log("[Standalone Workflow] Task completed successfully")
})
.catch((error) => {
console.error("[Standalone Workflow] Task failed:", error)
})
}),
}),
Layer.succeed(EnvironmentService, {
getEnv: () => process.env as unknown as Env,
}),
),
)
import type { WorkflowParams2 } from "@repo/types"
import * as Context from "effect/Context"
import type * as Effect from "effect/Effect"
// ===========================
// NonRetryableError Service
// ===========================
export interface NonRetryableErrorService {
readonly throw: (error: Error) => Effect.Effect<never, Error>
}
export const NonRetryableErrorService =
Context.GenericTag<NonRetryableErrorService>("NonRetryableErrorService")
// ===========================
// Workflow Step Manager
// ===========================
export interface WorkflowStepManager<Config = unknown> {
readonly do: <A, E = Error>(
name: string,
config: Config,
fn: () => Effect.Effect<A, E, never>,
) => Effect.Effect<A, E, never>
readonly sleep: (
name: string,
duration: number | string,
) => Effect.Effect<void, never, never>
}
export const WorkflowStepManager = Context.GenericTag<
WorkflowStepManager<unknown>
>("WorkflowStepManager")
// ===========================
// Workflow Trigger Service
// ===========================
// Define what a workflow instance looks like
export interface WorkflowInstance<Result = unknown> {
readonly id: string
readonly status: () => Promise<WorkflowStatus>
readonly wait: () => Promise<Result>
}
export interface WorkflowStatus {
readonly id: string
readonly status: "running" | "complete" | "failed" | "queued" | "unknown"
readonly error?: string
readonly output?: unknown
}
// Service interface for workflow triggers
export interface WorkflowTriggerService {
readonly create: <_Params extends WorkflowParams2, Result = unknown>(
idOrParams: string | WorkflowParams2,
params?: WorkflowParams2,
) => Effect.Effect<WorkflowInstance<Result>, never, never>
readonly createBatch: <Params extends WorkflowParams2, Result = unknown>(
batch: Array<{ params: Params }>,
) => Effect.Effect<Array<WorkflowInstance<Result>>, never, never>
readonly get: <Result = unknown>(
id: string,
) => Effect.Effect<WorkflowInstance<Result> | null, never, never>
}
// Create the service tag
export const WorkflowTriggerService =
Context.GenericTag<WorkflowTriggerService>("WorkflowTriggerService")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment