A complete guide to reactive state management with Effect
- Introduction & Core Concepts
- Core API Deep Dive
- Advanced Features
- Framework Integrations
- Architecture & Data Flow
- Practical Examples
- Advanced Patterns & Best Practices
Effect-Rx is a reactive state management library built on top of Effect that provides fine-grained reactive primitives for managing application state. It combines the power of Effect's functional programming model with a reactive programming paradigm, offering automatic dependency tracking, lazy evaluation, and sophisticated lifecycle management.
Key Design Principles:
- Reactive by Design: All state is reactive and automatically propagates changes to dependent computations
- Lazy Evaluation: Computations are only performed when their values are actually needed
- Automatic Cleanup: Unused reactive nodes are automatically disposed to prevent memory leaks
- Effect Integration: Seamless integration with Effect's error handling, concurrency, and resource management
- Framework Agnostic: Core library is framework-independent with dedicated bindings for React and Vue
graph TB
subgraph "Effect-Rx Architecture"
Registry[Registry<br/>Subscription Management]
subgraph "Reactive Primitives"
Rx[Rx<A><br/>Reactive Container]
Writable[Writable<R,W><br/>Mutable State]
Result[Result<A,E><br/>Async State]
end
subgraph "Integration Layer"
RxRuntime[RxRuntime<br/>Effect Runtime]
RxRef[RxRef<br/>Simple References]
end
subgraph "Framework Bindings"
React[React Hooks]
Vue[Vue Composables]
end
Registry --> Rx
Registry --> Writable
Registry --> Result
Rx --> RxRuntime
Writable --> RxRef
React --> Registry
Vue --> Registry
end
The Rx<A> type is the fundamental building block of effect-rx. It represents a reactive container that holds a value of type A and can notify subscribers when that value changes.
// Basic Rx creation
const countRx = Rx.make(0) // Rx<number>
const nameRx = Rx.make("Alice") // Rx<string>
// Derived Rx from function
const doubleCountRx = Rx.make((get) => get(countRx) * 2)Writable<R, W> extends Rx<R> with the ability to be updated. It reads values of type R and accepts writes of type W.
const stateRx = Rx.make(0) // Writable<number, number>
const toggleRx = Rx.make(false) // Writable<boolean, boolean>
// Update via registry
registry.set(stateRx, 42)
registry.update(stateRx, n => n + 1)Result<A, E> is a discriminated union that represents the state of asynchronous operations:
type Result<A, E> =
| Initial<A, E> // Not yet started
| Success<A, E> // Completed successfully
| Failure<A, E> // Failed with error
// Rx with async Effect
const userRx = Rx.make(
Effect.succeed({ id: 1, name: "Alice" })
) // Rx<Result<User, never>>The Registry manages all Rx instances, their subscriptions, and automatic cleanup:
const registry = Registry.make({
scheduleTask: (f) => scheduler.scheduleTask(f, 0),
defaultIdleTTL: 400 // Auto-cleanup after 400ms of inactivity
})
// Subscribe to changes
const unsubscribe = registry.subscribe(countRx, (value) => {
console.log("Count changed:", value)
})The Context provides access to other Rx values and utilities within reactive computations:
const derivedRx = Rx.make((get: Rx.Context) => {
const count = get(countRx) // Read another Rx
const name = get(nameRx) // Read multiple Rx values
get.mount(someOtherRx) // Mount dependencies
get.addFinalizer(() => cleanup()) // Register cleanup
return `${name}: ${count}`
})Effect-Rx implements sophisticated lifecycle management:
- Lazy Initialization: Rx values are only computed when first accessed
- Dependency Tracking: Automatic tracking of which Rx values depend on others
- Reference Counting: Tracks active subscriptions and component usage
- Automatic Disposal: Unused Rx nodes are cleaned up based on configurable TTL
- Finalizer Support: Cleanup callbacks for resource management
const resourceRx = Rx.make((get) => {
// Setup expensive resource
const resource = createExpensiveResource()
// Register cleanup
get.addFinalizer(() => resource.dispose())
return resource.data
})
// Keep alive to prevent cleanup
const persistentRx = Rx.make(initialData).pipe(Rx.keepAlive)Effect-Rx leverages Effect's robust error handling:
const apiCallRx = Rx.make(
Effect.gen(function* () {
const response = yield* HttpClient.get("/api/users")
return yield* response.json
}).pipe(
Effect.retry(Schedule.exponential("100 millis")),
Effect.timeout("30 seconds")
)
) // Rx<Result<User[], HttpError | TimeoutError>>The Rx<A> type is the cornerstone of effect-rx, representing a reactive container that holds a value of type A. It provides automatic dependency tracking, lazy evaluation, and change propagation.
interface Rx<A> extends Pipeable, Inspectable {
readonly [TypeId]: TypeId
readonly keepAlive: boolean
readonly lazy: boolean
readonly read: (get: Context) => A
readonly refresh?: (f: <A>(rx: Rx<A>) => void) => void
readonly label?: readonly [name: string, stack: string]
readonly idleTTL?: number
}1. Simple Values
// Create from primitive values
const numberRx = Rx.make(42) // Rx<number>
const stringRx = Rx.make("hello") // Rx<string>
const boolRx = Rx.make(true) // Rx<boolean>
// Create from objects
const userRx = Rx.make({
id: 1,
name: "Alice",
email: "alice@example.com"
}) // Rx<User>2. Computed Values (Derived State)
const countRx = Rx.make(0)
const doubleCountRx = Rx.make((get) => get(countRx) * 2)
// Multiple dependencies
const fullNameRx = Rx.make((get) =>
`${get(firstNameRx)} ${get(lastNameRx)}`
)
// Complex computations
const statisticsRx = Rx.make((get) => {
const users = get(usersRx)
const activeUsers = users.filter(u => u.active)
return {
total: users.length,
active: activeUsers.length,
percentage: activeUsers.length / users.length
}
})3. Effect-Based Rx
// Simple Effect
const greetingRx = Rx.make(Effect.succeed("Hello, World!"))
// Effect with dependencies
const userProfileRx = Rx.make((get) =>
Effect.gen(function* () {
const userId = get(currentUserIdRx)
const user = yield* UserService.findById(userId)
const preferences = yield* PreferenceService.getByUserId(userId)
return { user, preferences }
})
)
// Stream-based Rx (emits latest value)
const realtimeDataRx = Rx.make(
Stream.fromSchedule(Schedule.spaced("1 second")).pipe(
Stream.map(() => Math.random())
)
)Keep Alive
By default, Rx values are cleaned up when no longer referenced. Use keepAlive to persist them:
const persistentCountRx = Rx.make(0).pipe(Rx.keepAlive)
// Or using the combinator
const globalStateRx = Rx.keepAlive(Rx.make(initialState))Lazy Evaluation
Control when Rx values are computed:
// Eager evaluation (computed immediately when accessed)
const eagerRx = Rx.make(expensiveComputation).pipe(Rx.setLazy(false))
// Lazy evaluation (default - computed on first access)
const lazyRx = Rx.make(expensiveComputation).pipe(Rx.setLazy(true))Idle TTL (Time To Live)
Set custom cleanup timeouts:
// Clean up after 1 minute of inactivity
const cacheRx = Rx.make(fetchDataFromAPI).pipe(
Rx.setIdleTTL("1 minute")
)
// Using Duration helpers
const shortLivedRx = Rx.make(temporaryData).pipe(
Rx.setIdleTTL(Duration.seconds(30))
)Labeling for Debugging
Add labels to help with debugging and development:
const debugRx = Rx.make(complexComputation).pipe(
Rx.withLabel("UserStatistics")
)
// Labels include stack traces for debugging
// Appears in dev tools and error messagesMapping Values
const countRx = Rx.make(0)
// Transform the value
const doubledRx = Rx.map(countRx, n => n * 2)
// Transform Results specifically
const userRx = Rx.make(Effect.succeed({ name: "Alice", age: 30 }))
const nameRx = Rx.mapResult(userRx, user => user.name)Transforming with Context
const transformedRx = Rx.transform(countRx, (get) => {
const count = get(countRx)
const multiplier = get(multiplierRx)
const otherValue = get(otherRx)
return count * multiplier + otherValue
})Fallback Values
const primaryDataRx = Rx.make(fetchPrimaryData)
const fallbackDataRx = Rx.make(fetchFallbackData)
const dataWithFallbackRx = Rx.withFallback(primaryDataRx, fallbackDataRx)
// Uses fallbackDataRx if primaryDataRx is in Initial stateDebouncing
const searchTermRx = Rx.make("")
// Debounce rapid changes
const debouncedSearchRx = Rx.debounce(searchTermRx, "300 millis")
// Useful for API calls triggered by user input
const searchResultsRx = Rx.make((get) =>
Effect.gen(function* () {
const term = get(debouncedSearchRx)
if (term.length < 2) return []
return yield* SearchService.search(term)
})
)Initial Values
// Provide an initial value while async operation loads
const [userRx, defaultUser] = Rx.initialValue(
Rx.make(fetchCurrentUser),
{ id: 0, name: "Loading...", email: "" }
)Refresh Support
Make Rx values refreshable:
const refreshableDataRx = Rx.refreshable(
Rx.make(fetchApiData)
)
// Later, trigger refresh via registry
registry.refresh(refreshableDataRx)Custom Refresh Logic
const customRefreshRx = Rx.readable(
(get) => get(dataSourceRx),
(refresh) => {
// Custom refresh logic
refresh(dataSourceRx)
refresh(dependentDataRx)
}
)Mounting Dependencies
const compositeRx = Rx.make((get) => {
// Mount ensures the dependency stays active
get.mount(backgroundTaskRx)
const data = get(dataRx)
const status = get(statusRx)
return { data, status }
})One-time Access
const optimizedRx = Rx.make((get) => {
// Regular access - creates dependency
const always = get(alwaysNeededRx)
// One-time access - no dependency tracking
const initial = get.once(initialValueRx)
return computeResult(always, initial)
})// Extract types from Rx
type UserType = Rx.Infer<typeof userRx>
type UserSuccess = Rx.InferSuccess<typeof userResultRx>
type UserError = Rx.InferFailure<typeof userResultRx>
// Use in generic functions
function createCache<T extends Rx<any>>(rx: T): Cache<Rx.Infer<T>> {
// Implementation
}Family Pattern for Dynamic Rx Creation
// Create Rx instances dynamically based on parameters
const userByIdRx = Rx.family((userId: string) =>
Rx.make(
Effect.gen(function* () {
const user = yield* UserService.findById(userId)
return user
})
)
)
// Usage
const user1Rx = userByIdRx("user-1") // Rx<Result<User, Error>>
const user2Rx = userByIdRx("user-2") // Different instance
// Same ID returns same Rx instance (stable references)
const sameUser1Rx = userByIdRx("user-1") // === user1RxConditional Rx Values
const conditionalRx = Rx.make((get) => {
const condition = get(conditionRx)
if (condition) {
return get(expensiveComputationRx)
} else {
return get(fallbackValueRx)
}
})Resource Management
const resourceRx = Rx.make((get) => {
// Create expensive resource
const resource = createResource()
// Register cleanup
get.addFinalizer(() => {
console.log("Cleaning up resource")
resource.dispose()
})
return resource.data
})Writable<R, W> extends Rx<R> with the ability to update values. It's the primary interface for mutable state in effect-rx, where R is the read type and W is the write type (often the same, but can differ).
interface Writable<R, W = R> extends Rx<R> {
readonly [WritableTypeId]: WritableTypeId
readonly write: (ctx: WriteContext<R>, value: W) => void
}
interface WriteContext<A> {
readonly get: <A>(rx: Rx<A>) => A
readonly refreshSelf: () => void
readonly setSelf: (a: A) => void
readonly set: <R, W>(rx: Writable<R, W>, value: W) => void
}Basic Writable Creation
// Simple state values (R = W)
const countRx = Rx.make(0) // Writable<number, number>
const nameRx = Rx.make("Alice") // Writable<string, string>
const toggleRx = Rx.make(false) // Writable<boolean, boolean>
// Complex state objects
const userRx = Rx.make({
id: 1,
name: "Alice",
email: "alice@example.com",
preferences: { theme: "dark", notifications: true }
}) // Writable<User, User>Custom Read/Write Types
// Different read and write types
const temperatureRx = Rx.writable(
(get) => get.get(celsiusRx), // Read as number
(ctx, fahrenheit: number) => { // Write as fahrenheit
const celsius = (fahrenheit - 32) * 5/9
ctx.set(celsiusRx, celsius)
}
) // Writable<number, number> but with conversion logic
// Append-only log
const logRx = Rx.writable(
(get) => get.get(messagesRx), // Read array
(ctx, message: string) => { // Write single message
const current = ctx.get(messagesRx)
ctx.setSelf([...current, {
timestamp: Date.now(),
message
}])
}
) // Writable<Message[], string>Via Registry
const registry = Registry.make()
const countRx = Rx.make(0)
// Direct assignment
registry.set(countRx, 42)
// Functional update
registry.update(countRx, n => n + 1)
// Update with return value
const newValue = registry.modify(countRx, n => [n * 2, n + 10])
// Returns n * 2, sets value to n + 10Via Framework Hooks
// React
const [count, setCount] = useRx(countRx)
setCount(42) // Direct value
setCount(prev => prev + 1) // Functional update
// Vue
const [count, setCount] = useRx(countRx)
setCount(42)Computed Writable Values
// Writable derived from other writables
const userNameRx = Rx.writable(
(get) => get(userRx).name,
(ctx, newName: string) => {
const user = ctx.get(userRx)
ctx.set(userRx, { ...user, name: newName })
}
)
// Bidirectional computed values
const fullNameRx = Rx.writable(
(get) => `${get(firstNameRx)} ${get(lastNameRx)}`,
(ctx, fullName: string) => {
const [first, ...lastParts] = fullName.split(" ")
ctx.set(firstNameRx, first || "")
ctx.set(lastNameRx, lastParts.join(" "))
}
)State Validation & Constraints
const emailRx = Rx.writable(
(get) => get(rawEmailRx),
(ctx, email: string) => {
// Validation before setting
if (isValidEmail(email)) {
ctx.setSelf(email.toLowerCase().trim())
ctx.set(emailErrorRx, null)
} else {
ctx.set(emailErrorRx, "Invalid email format")
// Don't update the actual email value
}
}
)
const ageRx = Rx.writable(
(get) => get(rawAgeRx),
(ctx, age: number) => {
// Constrain values
const constrainedAge = Math.max(0, Math.min(120, age))
ctx.setSelf(constrainedAge)
}
)Undo/Redo State Management
interface UndoRedoState<T> {
current: T
history: T[]
future: T[]
}
const createUndoRedoRx = <T>(initialValue: T) => {
const stateRx = Rx.make<UndoRedoState<T>>({
current: initialValue,
history: [],
future: []
})
const currentRx = Rx.writable(
(get) => get(stateRx).current,
(ctx, newValue: T) => {
const state = ctx.get(stateRx)
ctx.set(stateRx, {
current: newValue,
history: [state.current, ...state.history],
future: [] // Clear redo stack on new change
})
}
)
const undoRx = Rx.writable(
(get) => get(stateRx).history.length > 0,
(ctx, _: void) => {
const state = ctx.get(stateRx)
if (state.history.length === 0) return
const [previous, ...restHistory] = state.history
ctx.set(stateRx, {
current: previous,
history: restHistory,
future: [state.current, ...state.future]
})
}
)
const redoRx = Rx.writable(
(get) => get(stateRx).future.length > 0,
(ctx, _: void) => {
const state = ctx.get(stateRx)
if (state.future.length === 0) return
const [next, ...restFuture] = state.future
ctx.set(stateRx, {
current: next,
history: [state.current, ...state.history],
future: restFuture
})
}
)
return { currentRx, undoRx, redoRx }
}
// Usage
const { currentRx: textRx, undoRx, redoRx } = createUndoRedoRx("")
// In component
const [text, setText] = useRx(textRx)
const [canUndo, undo] = useRx(undoRx)
const [canRedo, redo] = useRx(redoRx)Cross-Tab State Sync
const syncedStateRx = Rx.writable(
(get) => get(localStateRx),
(ctx, value: any) => {
// Update local state
ctx.setSelf(value)
// Sync to localStorage
localStorage.setItem('syncedState', JSON.stringify(value))
// Broadcast to other tabs
ctx.get(broadcastChannelRx).postMessage({
type: 'STATE_UPDATE',
value
})
}
)
// Listen for external updates
const broadcastListenerRx = Rx.make((get) => {
const channel = get(broadcastChannelRx)
get.addFinalizer(() => {
channel.close()
})
channel.addEventListener('message', (event) => {
if (event.data.type === 'STATE_UPDATE') {
// Update local state without triggering broadcast
get.set(localStateRx, event.data.value)
}
})
return channel
})Optimistic Updates
const optimisticUserRx = Rx.writable(
(get) => {
const user = get(serverUserRx)
const pending = get(pendingUpdateRx)
// Apply pending changes optimistically
return pending ? { ...user, ...pending } : user
},
(ctx, updates: Partial<User>) => {
// Set pending updates immediately (optimistic)
ctx.set(pendingUpdateRx, updates)
// Start async update
ctx.set(updateUserRx, updates)
}
)
const updateUserRx = Rx.fn(
Effect.fnUntraced(function* (updates: Partial<User>) {
try {
const updatedUser = yield* UserService.update(updates)
// Clear pending and update server state
yield* Effect.sync(() => {
registry.set(pendingUpdateRx, null)
registry.set(serverUserRx, updatedUser)
})
return updatedUser
} catch (error) {
// Revert optimistic update on error
yield* Effect.sync(() => {
registry.set(pendingUpdateRx, null)
})
throw error
}
})
)Field-Level Validation
interface FieldState<T> {
value: T
error: string | null
touched: boolean
dirty: boolean
}
const createFieldRx = <T>(
initialValue: T,
validator?: (value: T) => string | null
) => {
const stateRx = Rx.make<FieldState<T>>({
value: initialValue,
error: null,
touched: false,
dirty: false
})
const valueRx = Rx.writable(
(get) => get(stateRx).value,
(ctx, newValue: T) => {
const state = ctx.get(stateRx)
const error = validator ? validator(newValue) : null
ctx.set(stateRx, {
value: newValue,
error,
touched: true,
dirty: newValue !== initialValue
})
}
)
const blurRx = Rx.writable(
(get) => get(stateRx).touched,
(ctx, _: void) => {
const state = ctx.get(stateRx)
ctx.set(stateRx, { ...state, touched: true })
}
)
const resetRx = Rx.writable(
(get) => null,
(ctx, _: void) => {
ctx.set(stateRx, {
value: initialValue,
error: null,
touched: false,
dirty: false
})
}
)
return { stateRx, valueRx, blurRx, resetRx }
}
// Form composition
const emailField = createFieldRx("", validateEmail)
const passwordField = createFieldRx("", validatePassword)
const formValidRx = Rx.make((get) => {
const emailState = get(emailField.stateRx)
const passwordState = get(passwordField.stateRx)
return !emailState.error &&
!passwordState.error &&
emailState.value.length > 0 &&
passwordState.value.length > 0
})Async Validation
const usernameFieldRx = Rx.writable(
(get) => get(usernameStateRx),
(ctx, username: string) => {
// Set immediate state
ctx.set(usernameStateRx, {
value: username,
validating: true,
error: null
})
// Trigger async validation
ctx.set(validateUsernameRx, username)
}
)
const validateUsernameRx = Rx.fn(
Effect.fnUntraced(function* (username: string) {
if (username.length < 3) {
yield* Effect.sync(() => {
registry.set(usernameStateRx, {
value: username,
validating: false,
error: "Username must be at least 3 characters"
})
})
return
}
const isAvailable = yield* UserService.checkUsernameAvailability(username)
yield* Effect.sync(() => {
registry.set(usernameStateRx, {
value: username,
validating: false,
error: isAvailable ? null : "Username is already taken"
})
})
}).pipe(
Effect.debounce("500 millis") // Debounce API calls
)
)The Result<A, E> type is effect-rx's solution for representing the lifecycle of asynchronous operations. It's a discriminated union that captures all possible states of an async computation, providing type-safe handling of loading, success, and error states.
type Result<A, E = never> =
| Initial<A, E> // Operation not yet started or no data
| Success<A, E> // Operation completed successfully
| Failure<A, E> // Operation failed with error
interface Initial<A, E> {
readonly _tag: "Initial"
readonly waiting: boolean
}
interface Success<A, E> {
readonly _tag: "Success"
readonly value: A
readonly waiting: boolean // True if refresh in progress
}
interface Failure<A, E> {
readonly _tag: "Failure"
readonly cause: Cause.Cause<E>
readonly previousValue: Option.Option<A>
readonly waiting: boolean // True if retry in progress
}Simple Effect Results
// Basic Effect - returns Result<string, never>
const greetingRx = Rx.make(Effect.succeed("Hello, World!"))
// Effect with potential failure - returns Result<User, NotFoundError>
const userRx = Rx.make(
Effect.gen(function* () {
const user = yield* UserService.findById("user-123")
return user
})
)
// Effect with timeout and retries
const apiDataRx = Rx.make(
Effect.gen(function* () {
const response = yield* HttpClient.get("/api/data")
return yield* response.json
}).pipe(
Effect.timeout("10 seconds"),
Effect.retry(Schedule.exponential("100 millis", 2.0))
)
)Stream Results
// Stream emitting latest values - Result<number, never>
const counterStreamRx = Rx.make(
Stream.fromSchedule(Schedule.spaced("1 second")).pipe(
Stream.scan(0, (acc, _) => acc + 1)
)
)
// WebSocket stream with error handling
const websocketRx = Rx.make(
Stream.fromEffect(
Effect.gen(function* () {
const ws = yield* WebSocket.connect("ws://localhost:8080")
return Stream.fromAsyncIterable(ws.messages, (error) =>
new WebSocketError({ cause: error })
)
})
).pipe(Stream.flatten)
)Pattern Matching
// Full pattern matching
const UserProfile = () => {
const result = useRxValue(userRx)
return Result.match(result, {
onInitial: () => <div>Click to load user...</div>,
onFailure: (failure) => (
<div>
Error: {Cause.pretty(failure.cause)}
{Option.isSome(failure.previousValue) && (
<div>Last known: {failure.previousValue.value.name}</div>
)}
</div>
),
onSuccess: (success) => (
<div>
Welcome, {success.value.name}!
{success.waiting && <span>Refreshing...</span>}
</div>
)
})
}
// Error-specific pattern matching
const handleResult = Result.matchWithError(userResult, {
onInitial: () => "Not loaded",
onError: (error) => `Business error: ${error.message}`,
onDefect: (defect) => `System error: ${defect}`,
onSuccess: (success) => `Loaded: ${success.value.name}`
})
// With waiting state consideration
const withLoadingStates = Result.matchWithWaiting(result, {
onWaiting: () => <Spinner />,
onError: (error) => <ErrorDisplay error={error} />,
onDefect: (defect) => <SystemError error={defect} />,
onSuccess: (success) => <UserData user={success.value} />
})Extracting Values
// Safe value extraction
const userName = Result.getOrElse(userResult, () => "Anonymous")
// Get value or throw (for when you know it's Success)
const userData = Result.getOrThrow(userResult)
// Extract previous value from Failure
const lastKnownUser = Result.value(userResult) // Option<User>
// Convert to Exit for Effect integration
const exit = Result.toExit(userResult) // Exit<User, E | NoSuchElementException>Mapping Values
// Transform successful values
const userNameRx = Rx.mapResult(userRx, user => user.name)
// Result<User, E> -> Result<string, E>
// Chain transformations
const userDisplayRx = Rx.mapResult(
userRx,
user => `${user.name} (${user.email})`
)
// Using regular map (transforms entire Result)
const resultWithMetaRx = Rx.map(userRx, result => ({
result,
timestamp: Date.now(),
requestId: generateId()
}))Fallback Handling
const primaryDataRx = Rx.make(fetchPrimaryData)
const fallbackDataRx = Rx.make(fetchFallbackData)
// Use fallback when primary is Initial
const dataWithFallbackRx = Rx.withFallback(primaryDataRx, fallbackDataRx)
// Custom fallback logic
const customFallbackRx = Rx.make((get) => {
const primary = get(primaryDataRx)
if (primary._tag === "Initial") {
return get(fallbackDataRx)
}
if (primary._tag === "Failure" && isRetryableError(primary.cause)) {
return get(fallbackDataRx)
}
return primary
})Dependent Async Operations
const userProfileRx = Rx.make((get) =>
Effect.gen(function* () {
// Get user ID from another Rx
const userId = get(currentUserIdRx)
// Fetch user data
const user = yield* UserService.findById(userId)
// Fetch related data in parallel
const [preferences, notifications, activity] = yield* Effect.all([
PreferenceService.getByUserId(userId),
NotificationService.getByUserId(userId),
ActivityService.getRecentByUserId(userId)
])
return {
user,
preferences,
notifications,
recentActivity: activity
}
})
)Optimistic Updates with Rollback
const updateUserRx = Rx.fn(
Effect.fnUntraced(function* (updates: Partial<User>) {
const registry = yield* Registry.Registry
const currentUser = registry.get(userRx)
// Only proceed if we have current user data
if (currentUser._tag !== "Success") {
return yield* Effect.fail(new Error("No current user data"))
}
// Apply optimistic update
const optimisticUser = { ...currentUser.value, ...updates }
registry.set(localUserRx, Result.success(optimisticUser))
try {
// Perform actual update
const updatedUser = yield* UserService.update(updates)
// Update with server response
registry.set(localUserRx, Result.success(updatedUser))
return updatedUser
} catch (error) {
// Rollback on error
registry.set(localUserRx, currentUser)
throw error
}
})
)Polling and Real-time Updates
// Polling with exponential backoff on errors
const pollingDataRx = Rx.make(
Effect.gen(function* () {
return yield* Effect.repeat(
fetchLatestData,
Schedule.spaced("5 seconds").pipe(
Schedule.intersect(
Schedule.exponential("1 second").pipe(
Schedule.whileInput(isRetryableError)
)
)
)
)
})
)
// Combining polling with WebSocket updates
const realtimeDataRx = Rx.make((get) => {
const polling = get(pollingDataRx)
const websocket = get(websocketUpdatesRx)
// Use WebSocket if connected, fallback to polling
return websocket._tag === "Success" ? websocket : polling
})Granular Error Recovery
const resilientApiCallRx = Rx.make(
Effect.gen(function* () {
return yield* apiCall.pipe(
// Retry on network errors
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.whileInput((error) => error._tag === "NetworkError"),
Schedule.take(3)
)
),
// Timeout for slow responses
Effect.timeout("30 seconds"),
// Fallback for specific errors
Effect.catchSome((error) => {
if (error._tag === "RateLimitError") {
return Option.some(
Effect.delay(fallbackApiCall, error.retryAfter)
)
}
return Option.none()
}),
// Final error transformation
Effect.mapError((error) => new ApiError({
originalError: error,
timestamp: Date.now(),
endpoint: "/api/data"
}))
)
})
)Cascading Fallbacks
const dataWithMultipleFallbacksRx = Rx.make(
Effect.gen(function* () {
// Try primary source
const primary = yield* primaryDataSource.pipe(
Effect.timeout("5 seconds"),
Effect.option
)
if (Option.isSome(primary)) {
return primary.value
}
// Try secondary source
const secondary = yield* secondaryDataSource.pipe(
Effect.timeout("3 seconds"),
Effect.option
)
if (Option.isSome(secondary)) {
return secondary.value
}
// Try cache
const cached = yield* cacheService.get("data").pipe(
Effect.option
)
if (Option.isSome(cached)) {
return cached.value
}
// All sources failed
return yield* Effect.fail(new Error("All data sources unavailable"))
})
)Using Result in Effect Computations
const processUserDataRx = Rx.make((get) =>
Effect.gen(function* () {
// Extract successful value or fail the Effect
const user = yield* get.result(userRx)
const preferences = yield* get.result(preferencesRx)
// Process the data
const processed = yield* processUserData(user, preferences)
return processed
})
)
// One-time result extraction (no dependency tracking)
const oneTimeCheckRx = Rx.make((get) =>
Effect.gen(function* () {
// Get current state without creating dependency
const currentUser = yield* get.resultOnce(userRx)
// Perform one-time computation
return yield* computeBasedOnCurrentUser(currentUser)
})
)Suspending on Waiting States
const suspendableRx = Rx.make((get) =>
Effect.gen(function* () {
// Suspend Effect execution if still waiting
const data = yield* get.result(dataRx, { suspendOnWaiting: true })
// This only runs when data is ready (Success or Failure)
return yield* processData(data)
})
)Converting Results to Streams
// Stream of successful values only
const successStreamRx = Rx.make((get) =>
get.streamResult(dataRx) // Stream<A, E>
)
// Stream with error handling
const processedStreamRx = Rx.make((get) =>
get.streamResult(dataRx).pipe(
Stream.catchAll((error) =>
Stream.succeed(getDefaultValue(error))
)
)
)Working with Optional Results
const optionalDataRx = Rx.make(
Effect.gen(function* () {
const maybeData = yield* fetchOptionalData
if (Option.isNone(maybeData)) {
// Return successful empty result
return Option.none()
}
return maybeData
})
)
// Extract from Option Result
const extractedRx = Rx.make((get) =>
Effect.gen(function* () {
const optionData = yield* get.some(optionalDataRx)
// Fails if Option is None
return yield* processData(optionData)
})
)The Registry is the central coordination system in effect-rx that manages all Rx instances, their subscriptions, dependency tracking, and automatic cleanup. It acts as both a subscription manager and a memory management system, ensuring that reactive computations are efficiently maintained and disposed of when no longer needed.
interface Registry {
readonly [TypeId]: TypeId
readonly get: <A>(rx: Rx<A>) => A
readonly mount: <A>(rx: Rx<A>) => () => void
readonly refresh: <A>(rx: Rx<A> & Refreshable) => void
readonly set: <R, W>(rx: Writable<R, W>, value: W) => void
readonly modify: <R, W, A>(rx: Writable<R, W>, f: (_: R) => [returnValue: A, nextValue: W]) => A
readonly update: <R, W>(rx: Writable<R, W>, f: (_: R) => W) => void
readonly subscribe: <A>(rx: Rx<A>, f: (_: A) => void, options?: {
readonly immediate?: boolean
}) => () => void
readonly reset: () => void
readonly dispose: () => void
}Basic Registry Creation
// Default registry with standard settings
const registry = Registry.make()
// Registry with custom configuration
const customRegistry = Registry.make({
// Custom task scheduler (default: queueMicrotask)
scheduleTask: (task) => setTimeout(task, 0),
// Time resolution for timeout management (default: 1000ms)
timeoutResolution: 500,
// Default idle TTL for Rx cleanup (default: undefined = no auto-cleanup)
defaultIdleTTL: 2000,
// Initial values for Rx instances
initialValues: [
[userRx, { id: 1, name: "Default User" }],
[settingsRx, { theme: "light", language: "en" }]
]
})Integration with Effect Layers
// Create registry as Effect Layer
const registryLayer = Registry.layerOptions({
defaultIdleTTL: 1000,
scheduleTask: (f) => scheduler.scheduleTask(f, 0)
})
// Use in Effect context
const program = Effect.gen(function* () {
const registry = yield* Registry.RxRegistry
const userData = registry.get(userRx)
return userData
}).pipe(
Effect.provide(registryLayer)
)Reading Values
const registry = Registry.make()
// Get current value (triggers computation if not cached)
const currentCount = registry.get(countRx)
// Get with initial value if not yet computed
const userName = registry.get(userNameRx) // Triggers fetch if InitialModifying State
// Direct value assignment
registry.set(countRx, 42)
// Functional update
registry.update(countRx, count => count + 1)
// Update with return value
const doubledValue = registry.modify(countRx, count => [count * 2, count + 5])
// Returns count * 2, sets value to count + 5Mounting and Subscriptions
// Mount an Rx (keeps it alive, prevents cleanup)
const unmount = registry.mount(backgroundTaskRx)
// Unmount when no longer needed
unmount()
// Subscribe to value changes
const unsubscribe = registry.subscribe(userRx, (user) => {
console.log("User changed:", user)
}, { immediate: true }) // immediate: true calls listener with current value
// Cleanup subscription
unsubscribe()Automatic Dependency Detection
// Dependencies are automatically tracked
const derivedRx = Rx.make((get) => {
const user = get(userRx) // Creates dependency on userRx
const settings = get(settingsRx) // Creates dependency on settingsRx
return `${user.name} prefers ${settings.theme} theme`
})
// When userRx or settingsRx changes, derivedRx recomputes automatically
registry.set(userRx, { name: "Alice", id: 1 })
// derivedRx will recalculateManual Lifecycle Management
// Mount dependencies explicitly
const compositeRx = Rx.make((get) => {
// Ensure backgroundTaskRx stays active even if not directly used
get.mount(backgroundTaskRx)
const mainData = get(dataRx)
return processData(mainData)
})
// Reset registry (clears all cached values, keeps subscriptions)
registry.reset()
// Full cleanup (disposes all resources)
registry.dispose()Idle TTL Configuration
// Global default TTL
const registry = Registry.make({
defaultIdleTTL: 5000 // 5 seconds
})
// Per-Rx TTL override
const shortLivedRx = Rx.make(expensiveComputation).pipe(
Rx.setIdleTTL("1 second")
)
const longLivedRx = Rx.make(cacheData).pipe(
Rx.setIdleTTL("5 minutes")
)
// Persistent Rx (never cleaned up)
const persistentRx = Rx.make(globalState).pipe(
Rx.keepAlive
)Reference Counting
const registry = Registry.make()
// Registry tracks references automatically
const subscription1 = registry.subscribe(dataRx, console.log)
const subscription2 = registry.subscribe(dataRx, saveToStorage)
// Rx stays alive while subscriptions exist
subscription1() // Unsubscribe
// Rx is still alive (subscription2 active)
subscription2() // Unsubscribe
// Now Rx may be cleaned up (if TTL configured)Manual Refresh Operations
// Make Rx refreshable
const refreshableDataRx = Rx.refreshable(
Rx.make(fetchApiData)
)
// Trigger refresh manually
registry.refresh(refreshableDataRx)
// Custom refresh logic
const customRefreshRx = Rx.readable(
(get) => get(sourceDataRx),
(refresh) => {
// Custom refresh behavior
refresh(sourceDataRx)
refresh(relatedDataRx)
// Clear cache
localStorage.removeItem('cache-key')
}
)Scoped Registries
// Application-level registry
const globalRegistry = Registry.make({
defaultIdleTTL: 30000 // 30 seconds
})
// Component-level registry for temporary state
const createComponentRegistry = () => Registry.make({
defaultIdleTTL: 1000 // 1 second - quick cleanup
})
// Test registry with immediate cleanup
const testRegistry = Registry.make({
defaultIdleTTL: 0 // Immediate cleanup for testing
})Batch Operations
// Batch multiple updates for performance
Rx.batch(() => {
registry.set(userRx, newUser)
registry.set(settingsRx, newSettings)
registry.set(preferencesRx, newPreferences)
}) // All dependent Rx values update only once at the endRegistry Monitoring and Debugging
const createDebugRegistry = () => {
const registry = Registry.make()
const originalGet = registry.get
const originalSet = registry.set
// Wrap methods for logging
registry.get = function<A>(rx: Rx<A>): A {
console.log('Registry.get:', rx.label || 'unlabeled')
return originalGet.call(this, rx)
}
registry.set = function<R, W>(rx: Writable<R, W>, value: W): void {
console.log('Registry.set:', rx.label || 'unlabeled', value)
return originalSet.call(this, rx, value)
}
return registry
}With React Context
const RegistryContext = React.createContext<Registry>(
Registry.make({ defaultIdleTTL: 5000 })
)
export const RegistryProvider = ({ children }: { children: React.ReactNode }) => {
const registry = React.useMemo(() => Registry.make({
scheduleTask: Scheduler.unstable_scheduleCallback,
defaultIdleTTL: 5000
}), [])
React.useEffect(() => () => {
registry.dispose()
}, [registry])
return (
<RegistryContext.Provider value={registry}>
{children}
</RegistryContext.Provider>
)
}
export const useRegistry = () => React.useContext(RegistryContext)With Vue Injection
import { provide, inject } from 'vue'
const registryKey = Symbol('registry')
export const provideRegistry = (registry?: Registry) => {
const reg = registry ?? Registry.make({ defaultIdleTTL: 5000 })
provide(registryKey, reg)
// Cleanup on unmount
onBeforeUnmount(() => {
reg.dispose()
})
return reg
}
export const injectRegistry = (): Registry => {
const registry = inject(registryKey)
if (!registry) {
throw new Error('Registry not provided')
}
return registry
}Server-Side Rendering (SSR)
// Server-side registry with initial data
const createSSRRegistry = (initialData: Map<Rx<any>, any>) => {
return Registry.make({
initialValues: Array.from(initialData.entries()),
scheduleTask: (f) => setImmediate(f), // Node.js scheduler
defaultIdleTTL: undefined // No cleanup on server
})
}
// Hydration on client
const hydrateRegistry = (serverState: any) => {
const registry = Registry.make({
initialValues: Object.entries(serverState).map(([key, value]) =>
[rxLookup[key], value]
),
scheduleTask: (f) => queueMicrotask(f),
defaultIdleTTL: 5000
})
return registry
}Subscription Optimization
const optimizedRegistry = Registry.make({
// Batch updates for better performance
scheduleTask: (f) => {
// Collect tasks and run in next tick
if (!taskQueue.length) {
queueMicrotask(() => {
const tasks = taskQueue.splice(0)
tasks.forEach(task => task())
})
}
taskQueue.push(f)
}
})
// Efficient subscription patterns
const useOptimizedSubscription = <A>(rx: Rx<A>) => {
const [value, setValue] = React.useState(() => registry.get(rx))
React.useEffect(() => {
// Use immediate: false to avoid duplicate initial call
return registry.subscribe(rx, setValue, { immediate: false })
}, [rx])
return value
}Memory Leak Prevention
// Automatic cleanup registry
const createAutoCleanupRegistry = () => {
const registry = Registry.make({
defaultIdleTTL: 10000, // 10 second default
timeoutResolution: 1000 // Check every second
})
// Periodic cleanup of unused resources
const cleanupInterval = setInterval(() => {
// Force garbage collection of eligible nodes
registry.reset()
}, 60000) // Every minute
const originalDispose = registry.dispose
registry.dispose = function() {
clearInterval(cleanupInterval)
return originalDispose.call(this)
}
return registry
}The Context API is the dependency injection system in effect-rx that provides access to other Rx values, utilities, and registry operations within reactive computations. It's the primary interface for building reactive computations that depend on other reactive values.
interface Context {
<A>(rx: Rx<A>): A
readonly get: <A>(rx: Rx<A>) => A
readonly result: <A, E>(rx: Rx<Result.Result<A, E>>, options?: {
readonly suspendOnWaiting?: boolean | undefined
}) => Effect.Effect<A, E>
readonly resultOnce: <A, E>(rx: Rx<Result.Result<A, E>>, options?: {
readonly suspendOnWaiting?: boolean | undefined
}) => Effect.Effect<A, E>
readonly once: <A>(rx: Rx<A>) => A
readonly addFinalizer: (f: () => void) => void
readonly mount: <A>(rx: Rx<A>) => void
readonly refresh: <A>(rx: Rx<A> & Refreshable) => void
readonly refreshSelf: () => void
readonly self: <A>() => Option.Option<A>
readonly setSelf: <A>(a: A) => void
readonly set: <R, W>(rx: Writable<R, W>, value: W) => void
readonly some: <A>(rx: Rx<Option.Option<A>>) => Effect.Effect<A>
readonly someOnce: <A>(rx: Rx<Option.Option<A>>) => Effect.Effect<A>
readonly stream: <A>(rx: Rx<A>, options?: {
readonly withoutInitialValue?: boolean
readonly bufferSize?: number
}) => Stream.Stream<A>
readonly streamResult: <A, E>(rx: Rx<Result.Result<A, E>>, options?: {
readonly withoutInitialValue?: boolean
readonly bufferSize?: number
}) => Stream.Stream<A, E>
readonly subscribe: <A>(rx: Rx<A>, f: (_: A) => void, options?: {
readonly immediate?: boolean
}) => void
readonly registry: Registry.Registry
}Reading Values
const userDisplayRx = Rx.make((get) => {
// Basic value access - creates dependency
const user = get(userRx)
const settings = get(settingsRx)
// Alternative syntax (equivalent)
const preferences = get.get(preferencesRx)
return {
name: user.name,
theme: settings.theme,
language: preferences.language
}
})One-time Access (No Dependency Tracking)
const optimizedRx = Rx.make((get) => {
// Creates dependency - will re-run when userRx changes
const user = get(userRx)
// One-time access - no dependency created
const initialConfig = get.once(configRx)
// Use initial config but don't re-run when it changes
return processUser(user, initialConfig)
})Extracting Successful Values
const processedDataRx = Rx.make((get) =>
Effect.gen(function* () {
// Extract successful value or fail the Effect
const userData = yield* get.result(userRx)
const apiData = yield* get.result(apiDataRx)
// Process successful values
return combineData(userData, apiData)
})
)
// With suspend option
const suspendableRx = Rx.make((get) =>
Effect.gen(function* () {
// Suspend if still waiting/loading
const data = yield* get.result(asyncDataRx, {
suspendOnWaiting: true
})
return processImmediately(data)
})
)Conditional Dependencies
const conditionalRx = Rx.make((get) => {
const mode = get(modeRx)
if (mode === 'online') {
// This creates dependency only when online
const liveData = get(liveDataRx)
return { mode, data: liveData }
} else {
// This creates dependency only when offline
const cachedData = get(cachedDataRx)
return { mode, data: cachedData }
}
})Lifecycle Management
const resourceRx = Rx.make((get) => {
// Create resource
const ws = new WebSocket('ws://localhost:8080')
// Register cleanup
get.addFinalizer(() => {
console.log('Cleaning up WebSocket')
ws.close()
})
// Mount dependencies to keep them alive
get.mount(backgroundTaskRx)
return ws
})RxRuntime provides deep integration with Effect's runtime system, enabling seamless use of Effect Services, Layers, and advanced async patterns within reactive computations.
interface RxRuntime<R, ER> extends Rx<Result<Runtime.Runtime<R>, ER>> {
readonly layer: Rx<Layer.Layer<R, ER>>
readonly rx: {
<A, E>(create: (get: Context) => Effect.Effect<A, E, Scope.Scope | R | RxRegistry>, options?: {
readonly initialValue?: A
}): Rx<Result<A, E | ER>>
<A, E>(effect: Effect.Effect<A, E, Scope.Scope | R>, options?: {
readonly initialValue?: A
}): Rx<Result<A, E | ER>>
}
readonly fn: {
<Arg, E, A>(fn: (arg: Arg, get: FnContext) => Effect.Effect<A, E, Scope.Scope | RxRegistry | R>, options?: {
readonly initialValue?: A
}): RxResultFn<RxResultFn.ArgToVoid<Arg>, A, E | ER>
}
readonly pull: <A, E>(
create: ((get: Context) => Stream.Stream<A, E, R | RxRegistry>) | Stream.Stream<A, E, R | RxRegistry>,
options?: {
readonly disableAccumulation?: boolean
readonly initialValue?: ReadonlyArray<A>
}
) => Writable<PullResult<A, E | ER>, void>
}Basic Runtime Creation
// Create runtime from Layer
const AppServices = Layer.provide(
UserService.Default,
DatabaseService.Default,
HttpService.Default
)
const appRuntime = Rx.runtime(AppServices)
// Runtime factory with global layers
const runtimeFactory = Rx.context()
// Add global layers (e.g., for logging, config)
runtimeFactory.addGlobalLayer(
Layer.setConfigProvider(ConfigProvider.fromJson(import.meta.env))
)
runtimeFactory.addGlobalLayer(
Logger.minimumLogLevel(LogLevel.Info)
)
// Create runtime with global layers applied
const servicesRuntime = runtimeFactory(AppServices)Runtime Dependencies
class UserService extends Effect.Service<UserService>()("UserService", {
effect: Effect.gen(function* () {
const database = yield* DatabaseService
const findById = (id: string) =>
Effect.gen(function* () {
const user = yield* database.query("SELECT * FROM users WHERE id = ?", [id])
return user
})
const create = (userData: CreateUserRequest) =>
Effect.gen(function* () {
const id = yield* database.insert("users", userData)
return { id, ...userData }
})
return { findById, create } as const
})
}) {}
const userRuntime = Rx.runtime(UserService.Default)Effect-based Rx Creation
// Create Rx with service dependencies
const currentUserRx = userRuntime.rx((get) =>
Effect.gen(function* () {
const userId = get(currentUserIdRx)
const userService = yield* UserService
return yield* userService.findById(userId)
})
)
// Direct Effect (no dependencies on other Rx)
const allUsersRx = userRuntime.rx(
Effect.gen(function* () {
const userService = yield* UserService
return yield* userService.findAll()
})
)Function Creation with Runtime
// Create reactive functions with service access
const createUserRx = userRuntime.fn(
Effect.fnUntraced(function* (userData: CreateUserRequest) {
const userService = yield* UserService
const newUser = yield* userService.create(userData)
// Update reactive state
const registry = yield* Registry.Registry
const currentUsers = registry.get(allUsersRx)
if (currentUsers._tag === "Success") {
registry.set(allUsersRx, Result.success([...currentUsers.value, newUser]))
}
return newUser
})
)
// Usage in components
const CreateUserForm = () => {
const createUser = useRxSetPromise(createUserRx)
const handleSubmit = async (userData: CreateUserRequest) => {
const exit = await createUser(userData)
if (Exit.isSuccess(exit)) {
console.log("User created:", exit.value)
} else {
console.error("Failed to create user:", Cause.pretty(exit.cause))
}
}
return <form onSubmit={handleSubmit}>...</form>
}Pull-based Stream Processing
const messagesRuntime = Rx.runtime(MessageService.Default)
// Create pull-based message loader
const messagesPullRx = messagesRuntime.pull(
Effect.gen(function* () {
const messageService = yield* MessageService
return yield* messageService.getMessagesStream()
}),
{
disableAccumulation: false, // Accumulate messages
initialValue: [] // Start with empty array
}
)
// Component usage
const MessageList = () => {
const [messagesResult, loadMore] = useRx(messagesPullRx)
return Result.match(messagesResult, {
onInitial: () => <div>No messages loaded</div>,
onSuccess: (success) => (
<div>
{success.value.items.map(message => (
<div key={message.id}>{message.content}</div>
))}
<button
onClick={() => loadMore()}
disabled={success.waiting}
>
{success.waiting ? "Loading..." : "Load More"}
</button>
</div>
),
onFailure: (failure) => <div>Error: {Cause.pretty(failure.cause)}</div>
})
}Multi-Service Integration
// Compose multiple services
const AppRuntime = Layer.provide(
UserService.Default,
MessageService.Default,
NotificationService.Default,
EmailService.Default
)
const appRuntime = Rx.runtime(AppRuntime)
// Cross-service reactive computation
const userNotificationsRx = appRuntime.rx((get) =>
Effect.gen(function* () {
const userId = get(currentUserIdRx)
const [user, notifications, unreadCount] = yield* Effect.all([
UserService.pipe(Effect.flatMap(s => s.findById(userId))),
NotificationService.pipe(Effect.flatMap(s => s.getByUserId(userId))),
NotificationService.pipe(Effect.flatMap(s => s.getUnreadCount(userId)))
])
return {
user,
notifications,
unreadCount,
lastUpdated: Date.now()
}
})
)Scoped Runtime Operations
// Runtime with resource management
const databaseRuntime = Rx.runtime(
Layer.scoped(
DatabaseService,
Effect.gen(function* () {
const pool = yield* Effect.acquireRelease(
createConnectionPool(),
(pool) => Effect.promise(() => pool.close())
)
return {
query: (sql: string, params: any[]) =>
Effect.promise(() => pool.query(sql, params)),
transaction: <A>(effect: Effect.Effect<A, any, any>) =>
Effect.scoped(
Effect.gen(function* () {
const conn = yield* Effect.acquireRelease(
Effect.promise(() => pool.getConnection()),
(conn) => Effect.promise(() => conn.release())
)
yield* Effect.promise(() => conn.beginTransaction())
try {
const result = yield* effect
yield* Effect.promise(() => conn.commit())
return result
} catch (error) {
yield* Effect.promise(() => conn.rollback())
throw error
}
})
)
} as const
})
)
)Effect-Rx provides sophisticated integration with Effect Streams, supporting both push-based and pull-based reactive patterns.
Real-time Data Streams
// WebSocket stream integration
const realtimeDataRx = Rx.make(
Stream.fromEffect(
Effect.gen(function* () {
const ws = yield* Effect.promise(() =>
new Promise<WebSocket>((resolve, reject) => {
const socket = new WebSocket('ws://localhost:8080/data')
socket.onopen = () => resolve(socket)
socket.onerror = reject
})
)
return Stream.fromAsyncIterable(
{
[Symbol.asyncIterator]: () => ({
async next() {
return new Promise((resolve) => {
ws.onmessage = (event) => {
resolve({ value: JSON.parse(event.data), done: false })
}
ws.onclose = () => {
resolve({ value: undefined, done: true })
}
})
}
})
},
(error) => new Error(`WebSocket error: ${error}`)
)
})
).pipe(Stream.flatten)
)Polling Streams with Backoff
const pollingDataRx = Rx.make(
Stream.fromSchedule(Schedule.spaced("5 seconds")).pipe(
Stream.mapEffect(() =>
Effect.gen(function* () {
const response = yield* HttpClient.get("/api/data")
return yield* response.json
}).pipe(
Effect.retry(
Schedule.exponential("1 second").pipe(
Schedule.intersect(Schedule.recurs(3))
)
),
Effect.timeout("10 seconds")
)
),
Stream.catchAll((error) =>
Stream.fromEffect(Effect.log(`Polling error: ${error}`)).pipe(
Stream.chain(() => Stream.empty)
)
)
)
)Infinite Scroll Implementation
const infiniteScrollRx = Rx.pull(
Stream.unfoldEffect(0, (page) =>
Effect.gen(function* () {
const response = yield* HttpClient.get(`/api/items?page=${page}&limit=20`)
const data = yield* response.json
if (data.items.length === 0) {
return Option.none() // End of stream
}
return Option.some([data.items, page + 1])
}).pipe(
Effect.catchAll(() => Effect.succeed(Option.none()))
)
).pipe(
Stream.flatMap(items => Stream.fromIterable(items))
),
{
disableAccumulation: false // Keep accumulating items
}
)
// Component usage
const InfiniteList = () => {
const [result, loadMore] = useRx(infiniteScrollRx)
return Result.match(result, {
onSuccess: (success) => (
<div>
<div className="items">
{success.value.items.map(item => (
<ItemCard key={item.id} item={item} />
))}
</div>
{!success.value.done && (
<button
onClick={() => loadMore()}
disabled={success.waiting}
>
{success.waiting ? "Loading..." : "Load More"}
</button>
)}
</div>
),
onFailure: (failure) => <div>Error loading items</div>,
onInitial: () => <div>Click to load items</div>
})
}Chunked Data Processing
const batchProcessorRx = Rx.pull(
Stream.fromIterable(largeDataSet).pipe(
Stream.chunks(100), // Process in chunks of 100
Stream.mapEffect((chunk) =>
Effect.gen(function* () {
// Process chunk with potential side effects
const processed = yield* Effect.forEach(
chunk,
(item) => processItem(item),
{ concurrency: 5 } // Process 5 items concurrently
)
// Report progress
yield* Effect.log(`Processed ${processed.length} items`)
return processed
})
)
),
{
disableAccumulation: true // Don't accumulate, just show current chunk
}
)Rx to Stream Conversion
// Convert Rx to Stream for further processing
const userStreamRx = Rx.make((get) =>
get.stream(userRx, {
withoutInitialValue: false,
bufferSize: 10
}).pipe(
Stream.filter(user => user.active),
Stream.map(user => ({ ...user, lastSeen: Date.now() })),
Stream.debounce("1 second")
)
)
// Result streams for error handling
const processedDataStreamRx = Rx.make((get) =>
get.streamResult(apiDataRx).pipe(
Stream.mapEffect((data) =>
Effect.gen(function* () {
const processed = yield* processData(data)
yield* Effect.log(`Processed data: ${processed.id}`)
return processed
})
),
Stream.catchAll((error) =>
Stream.fromEffect(
Effect.log(`Processing error: ${error}`).pipe(
Effect.as(fallbackData)
)
)
)
)
)Effect-Rx provides reactive function abstractions that enable async operations while maintaining reactivity.
Basic Function Creation
// Simple async function
const fetchUserRx = Rx.fn(
Effect.fnUntraced(function* (userId: string) {
const response = yield* HttpClient.get(`/api/users/${userId}`)
return yield* response.json
})
)
// Usage with Result handling
const UserProfile = ({ userId }: { userId: string }) => {
const [result, fetchUser] = useRx(fetchUserRx)
React.useEffect(() => {
fetchUser(userId)
}, [userId, fetchUser])
return Result.match(result, {
onInitial: () => <button onClick={() => fetchUser(userId)}>Load User</button>,
onSuccess: (user) => <div>Welcome, {user.name}!</div>,
onFailure: (error) => <div>Error: {Cause.pretty(error.cause)}</div>
})
}Functions with Service Dependencies
const userRuntime = Rx.runtime(UserService.Default)
const updateUserRx = userRuntime.fn(
Effect.fnUntraced(function* (updates: Partial<User>) {
const userService = yield* UserService
const registry = yield* Registry.Registry
// Get current user
const currentResult = registry.get(currentUserRx)
if (currentResult._tag !== "Success") {
return yield* Effect.fail(new Error("No current user"))
}
// Update user
const updatedUser = yield* userService.update(currentResult.value.id, updates)
// Update reactive state
registry.set(currentUserRx, Result.success(updatedUser))
return updatedUser
})
)Immediate Computations
// Sync function with optional initial value
const calculateTotalRx = Rx.fnSync((items: CartItem[]) =>
items.reduce((total, item) => total + (item.price * item.quantity), 0)
)
// With initial value
const formatCurrencyRx = Rx.fnSync(
(amount: number) => new Intl.NumberFormat('en-US', {
style: 'currency',
currency: 'USD'
}).format(amount),
{ initialValue: "$0.00" }
)
// Usage
const ShoppingCart = () => {
const [items, setItems] = useRx(cartItemsRx)
const [total, calculateTotal] = useRx(calculateTotalRx)
const [formattedTotal, formatTotal] = useRx(formatCurrencyRx)
React.useEffect(() => {
const totalAmount = calculateTotal(items)
if (totalAmount !== undefined) {
formatTotal(totalAmount)
}
}, [items, calculateTotal, formatTotal])
return (
<div>
<div>Items: {items.length}</div>
<div>Total: {formattedTotal}</div>
</div>
)
}Form Computation Functions
// Field validation function
const validateEmailRx = Rx.fnSync((email: string) => {
if (!email) return { valid: false, error: "Email required" }
if (!/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email)) {
return { valid: false, error: "Invalid email format" }
}
return { valid: true, error: null }
})
// Complex form computation
const calculateShippingRx = Rx.fnSync(
(orderData: { items: CartItem[], address: Address, method: ShippingMethod }) => {
const { items, address, method } = orderData
const weight = items.reduce((w, item) => w + item.weight, 0)
const distance = calculateDistance(address.zipCode)
return {
cost: method.baseRate + (weight * method.weightRate) + (distance * method.distanceRate),
estimatedDays: method.baseDays + Math.ceil(distance / 100)
}
},
{ initialValue: { cost: 0, estimatedDays: 0 } }
)Effect-Rx provides seamless integration with various storage mechanisms for persistent and synchronized state.
Browser LocalStorage
import { BrowserKeyValueStore } from "@effect/platform-browser"
// Create KVS runtime
const storageRuntime = Rx.runtime(BrowserKeyValueStore.layerLocalStorage)
// Persistent theme setting
const themeRx = Rx.kvs({
runtime: storageRuntime,
key: "user-theme",
schema: Schema.Literal("light", "dark", "system"),
defaultValue: () => "system" as const
})
// Persistent user preferences
const userPreferencesRx = Rx.kvs({
runtime: storageRuntime,
key: "user-preferences",
schema: Schema.Struct({
language: Schema.String,
notifications: Schema.Boolean,
autoSave: Schema.Boolean
}),
defaultValue: () => ({
language: "en",
notifications: true,
autoSave: true
})
})
// Component usage
const ThemeSelector = () => {
const [theme, setTheme] = useRx(themeRx)
return (
<select value={theme} onChange={(e) => setTheme(e.target.value as any)}>
<option value="light">Light</option>
<option value="dark">Dark</option>
<option value="system">System</option>
</select>
)
}Cross-tab Synchronization
const syncedStateRx = Rx.kvs({
runtime: storageRuntime,
key: "shared-state",
schema: Schema.Struct({
lastAction: Schema.String,
timestamp: Schema.Number,
data: Schema.Unknown
}),
defaultValue: () => ({
lastAction: "init",
timestamp: Date.now(),
data: null
})
})
// Listen for storage events from other tabs
const crossTabSyncRx = Rx.make((get) => {
const handleStorageChange = (event: StorageEvent) => {
if (event.key === "shared-state" && event.newValue) {
try {
const newState = JSON.parse(event.newValue)
get.set(syncedStateRx, newState)
} catch (error) {
console.warn("Failed to sync cross-tab state:", error)
}
}
}
window.addEventListener("storage", handleStorageChange)
get.addFinalizer(() => {
window.removeEventListener("storage", handleStorageChange)
})
return get(syncedStateRx)
})Simple URL State
// Simple search param
const searchQueryRx = Rx.searchParam("q")
// Typed search param with schema
const pageNumberRx = Rx.searchParam("page", {
schema: Schema.NumberFromString
})
const sortOrderRx = Rx.searchParam("sort", {
schema: Schema.Literal("asc", "desc")
})
// Component that syncs with URL
const SearchResults = () => {
const [query, setQuery] = useRx(searchQueryRx)
const [page, setPage] = useRx(pageNumberRx)
const [sort, setSort] = useRx(sortOrderRx)
return (
<div>
<input
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Search..."
/>
<select
value={Option.getOrElse(sort, () => "asc")}
onChange={(e) => setSort(Option.some(e.target.value as "asc" | "desc"))}
>
<option value="asc">Ascending</option>
<option value="desc">Descending</option>
</select>
<div>
Page: {Option.getOrElse(page, () => 1)}
<button onClick={() => setPage(Option.map(page, p => p - 1))}>
Previous
</button>
<button onClick={() => setPage(Option.map(page, p => p + 1))}>
Next
</button>
</div>
</div>
)
}Complex URL State Management
// Composite URL state
const searchFiltersRx = Rx.make((get) => {
const query = get(searchQueryRx)
const category = get(Rx.searchParam("category"))
const minPrice = get(Rx.searchParam("minPrice", { schema: Schema.NumberFromString }))
const maxPrice = get(Rx.searchParam("maxPrice", { schema: Schema.NumberFromString }))
const inStock = get(Rx.searchParam("inStock", { schema: Schema.BooleanFromString }))
return {
query,
category,
priceRange: {
min: Option.getOrElse(minPrice, () => 0),
max: Option.getOrElse(maxPrice, () => Infinity)
},
inStock: Option.getOrElse(inStock, () => false)
}
})
// Derived search results
const searchResultsRx = Rx.make((get) =>
Effect.gen(function* () {
const filters = get(searchFiltersRx)
const response = yield* HttpClient.get("/api/search", {
query: {
q: filters.query,
category: filters.category,
minPrice: filters.priceRange.min.toString(),
maxPrice: filters.priceRange.max === Infinity ? undefined : filters.priceRange.max.toString(),
inStock: filters.inStock.toString()
}
})
return yield* response.json
})
)RxRef provides a lightweight alternative to full Rx for simple reactive values that don't need the full Effect integration.
import { RxRef, useRxRef } from "@effect-rx/rx-react"
// Create simple reactive references
const countRef = RxRef.make(0)
const nameRef = RxRef.make("Alice")
// Derived references
const doubleCountRef = countRef.map(n => n * 2)
const greetingRef = nameRef.map(name => `Hello, ${name}!`)
// Component usage
const SimpleCounter = () => {
const count = useRxRef(countRef)
const doubleCount = useRxRef(doubleCountRef)
return (
<div>
<p>Count: {count}</p>
<p>Double: {doubleCount}</p>
<button onClick={() => countRef.set(count + 1)}>
Increment
</button>
</div>
)
}Object Property Access
const userRef = RxRef.make({
name: "Alice",
email: "alice@example.com",
preferences: {
theme: "dark",
notifications: true
}
})
// Create property references
const nameRef = userRef.prop("name")
const emailRef = userRef.prop("email")
const themeRef = userRef.prop("preferences").prop("theme")
// Component with property bindings
const UserSettings = () => {
const name = useRxRef(nameRef)
const email = useRxRef(emailRef)
const theme = useRxRef(themeRef)
return (
<form>
<input
value={name}
onChange={(e) => nameRef.set(e.target.value)}
/>
<input
value={email}
onChange={(e) => emailRef.set(e.target.value)}
/>
<select
value={theme}
onChange={(e) => themeRef.set(e.target.value)}
>
<option value="light">Light</option>
<option value="dark">Dark</option>
</select>
</form>
)
}Reactive Arrays
const todosRef = RxRef.collection([
{ id: 1, text: "Learn Effect-Rx", completed: false },
{ id: 2, text: "Build awesome app", completed: false }
])
// Component with collection operations
const TodoList = () => {
const todos = useRxRef(todosRef)
const addTodo = (text: string) => {
todosRef.push({
id: Date.now(),
text,
completed: false
})
}
const removeTodo = (todoRef: RxRef.RxRef<Todo>) => {
todosRef.remove(todoRef)
}
return (
<div>
{todos.map((todoRef) => (
<TodoItem
key={todoRef.value.id}
todoRef={todoRef}
onRemove={() => removeTodo(todoRef)}
/>
))}
<AddTodoForm onAdd={addTodo} />
</div>
)
}
const TodoItem = ({
todoRef,
onRemove
}: {
todoRef: RxRef.RxRef<Todo>
onRemove: () => void
}) => {
const todo = useRxRef(todoRef)
const textRef = useRxRefProp(todoRef, "text")
const completedRef = useRxRefProp(todoRef, "completed")
return (
<div className={`todo ${todo.completed ? "completed" : ""}`}>
<input
type="checkbox"
checked={todo.completed}
onChange={(e) => completedRef.set(e.target.checked)}
/>
<input
value={todo.text}
onChange={(e) => textRef.set(e.target.value)}
/>
<button onClick={onRemove}>Remove</button>
</div>
)
}Effect-rx provides comprehensive React integration through the @effect-rx/rx-react package, offering hooks that seamlessly connect reactive state to React components.
// Basic value subscription
const useRxValue: <A>(rx: Rx<A>) => A
// Read and write state
const useRx: <R, W>(rx: Writable<R, W>) => [value: R, setter: (value: W) => void]
// Write-only access (mounts the Rx)
const useRxSet: <R, W>(rx: Writable<R, W>) => (value: W) => void
// Promise-based setter for Result Rx values
const useRxSetPromise: <A, E, W>(
rx: Writable<Result<A, E>, W>
) => (value: W) => Promise<Exit<A, E>>
// Suspense integration
const useRxSuspense: <A, E>(
rx: Rx<Result<A, E>>,
options?: { suspendOnWaiting?: boolean }
) => Result.Success<A, E> | Result.Failure<A, E>Counter with Derived State
import { Rx, useRx, useRxValue } from "@effect-rx/rx-react"
// State definition
const countRx = Rx.make(0).pipe(Rx.keepAlive)
const doubleCountRx = Rx.map(countRx, n => n * 2)
const isEvenRx = Rx.map(countRx, n => n % 2 === 0)
function Counter() {
const [count, setCount] = useRx(countRx)
const doubleCount = useRxValue(doubleCountRx)
const isEven = useRxValue(isEvenRx)
return (
<div>
<p>Count: {count} (Double: {doubleCount})</p>
<p>Is Even: {isEven ? "Yes" : "No"}</p>
<button onClick={() => setCount(prev => prev + 1)}>
Increment
</button>
<button onClick={() => setCount(0)}>Reset</button>
</div>
)
}Effect-rx provides Vue 3 integration through the @effect-rx/rx-vue package, offering composables that connect reactive state to Vue's reactivity system.
// Read-only value subscription
const useRxValue: <A>(rx: Rx<A>) => Readonly<Ref<A>>
// Read and write state
const useRx: <R, W>(rx: Writable<R, W>) => readonly [Readonly<Ref<R>>, (value: W) => void]
// Write-only access (mounts the Rx)
const useRxSet: <R, W>(rx: Writable<R, W>) => (value: W) => void
// RxRef integration
const useRxRef: <A>(rxRef: RxRef.ReadonlyRef<A>) => Readonly<Ref<A>>import { provide, inject } from "vue"
import { Registry, registryKey, injectRegistry } from "@effect-rx/rx-vue"
// In app setup
const registry = Registry.make()
app.provide(registryKey, registry)
// In components
const registry = injectRegistry() // Falls back to default registryReactive Counter Component
<template>
<div>
<p>Count: {{ count }} (Double: {{ doubleCount }})</p>
<p>Is Even: {{ isEven ? "Yes" : "No" }}</p>
<button @click="increment">Increment</button>
<button @click="setCount(0)">Reset</button>
</div>
</template>
<script setup lang="ts">
import { Rx, useRx, useRxValue } from "@effect-rx/rx-vue"
// State definition
const countRx = Rx.make(0).pipe(Rx.keepAlive)
const doubleCountRx = Rx.map(countRx, n => n * 2)
const isEvenRx = Rx.map(countRx, n => n % 2 === 0)
// Component reactive state
const [count, setCount] = useRx(countRx)
const doubleCount = useRxValue(doubleCountRx)
const isEven = useRxValue(isEvenRx)
const increment = () => setCount(count.value + 1)
</script>Todo List with RxRef
<template>
<div>
<form @submit.prevent="addTodo">
<input v-model="newTodoText" placeholder="Add todo..." />
<button type="submit">Add</button>
</form>
<div v-for="todoRef in todos" :key="todoRef.key">
<TodoItem :todoRef="todoRef" @remove="removeTodo(todoRef)" />
</div>
</div>
</template>
<script setup lang="ts">
import { ref } from "vue"
import { RxRef, useRxRef } from "@effect-rx/rx-vue"
interface Todo {
id: number
text: string
completed: boolean
}
// Reactive collection
const todosRef = RxRef.collection<Todo>([])
const todos = useRxRef(todosRef)
const newTodoText = ref("")
const addTodo = () => {
if (newTodoText.value.trim()) {
todosRef.push({
id: Date.now(),
text: newTodoText.value.trim(),
completed: false
})
newTodoText.value = ""
}
}
const removeTodo = (todoRef: RxRef.RxRef<Todo>) => {
todosRef.remove(todoRef)
}
</script>TodoItem Component
<template>
<div :class="{ completed: todo.completed }">
<input
type="checkbox"
:checked="todo.completed"
@change="toggleCompleted"
/>
<input
v-model="todo.text"
@input="updateText"
/>
<button @click="$emit('remove')">Remove</button>
</div>
</template>
<script setup lang="ts">
import { RxRef, useRxRef } from "@effect-rx/rx-vue"
interface Todo {
id: number
text: string
completed: boolean
}
const props = defineProps<{
todoRef: RxRef.RxRef<Todo>
}>()
defineEmits<{
remove: []
}>()
const todo = useRxRef(props.todoRef)
const toggleCompleted = () => {
props.todoRef.prop("completed").set(!todo.value.completed)
}
const updateText = (event: Event) => {
const target = event.target as HTMLInputElement
props.todoRef.prop("text").set(target.value)
}
</script>Async Data with Result Type
<template>
<div>
<div v-if="userResult._tag === 'Initial'">
Loading user...
</div>
<div v-else-if="userResult._tag === 'Failure'">
Error: {{ userResult.error }}
<button @click="refresh">Retry</button>
</div>
<div v-else-if="userResult._tag === 'Success'">
<h2>{{ userResult.value.name }}</h2>
<p>{{ userResult.value.email }}</p>
<button @click="refresh">Refresh</button>
</div>
</div>
</template>
<script setup lang="ts">
import { Rx, Result, useRxValue, useRxSet } from "@effect-rx/rx-vue"
import { Effect } from "effect"
const props = defineProps<{
userId: string
}>()
// User data with async loading
const userRx = Rx.fn<{ userId: string }>()(({ userId }) =>
Effect.gen(function* () {
const response = yield* fetch(`/api/users/${userId}`)
return yield* response.json()
}).pipe(
Effect.retry({ times: 3, delay: "1 second" })
)
)
// Reactive state
const userResult = useRxValue(userRx)
const setUser = useRxSet(userRx)
// Load user data
setUser({ userId: props.userId })
const refresh = () => {
setUser({ userId: props.userId })
}
// Watch for userId changes
watch(() => props.userId, (newUserId) => {
setUser({ userId: newUserId })
}, { immediate: true })
</script>Composition API Integration
// composables/useUserDashboard.ts
import { computed } from "vue"
import { Rx, Result, useRxValue } from "@effect-rx/rx-vue"
export const useUserDashboard = (userId: string) => {
// Rx definitions
const userRx = Rx.fn<{ userId: string }>()(/* ... */)
const notificationsRx = Rx.fn<{ userId: string }>()(/* ... */)
// Derived state
const summaryRx = Rx.make((get) => {
const user = get(userRx)
const notifications = get(notificationsRx)
if (Result.isSuccess(user) && Result.isSuccess(notifications)) {
return {
userName: user.value.name,
unreadCount: notifications.value.filter(n => !n.read).length,
lastLoginAt: user.value.lastLoginAt
}
}
return null
})
// Reactive values
const userResult = useRxValue(userRx)
const notificationsResult = useRxValue(notificationsRx)
const summary = useRxValue(summaryRx)
// Computed values
const isLoading = computed(() =>
userResult.value._tag === "Initial" ||
notificationsResult.value._tag === "Initial"
)
const hasError = computed(() =>
userResult.value._tag === "Failure" ||
notificationsResult.value._tag === "Failure"
)
return {
userResult,
notificationsResult,
summary,
isLoading,
hasError
}
}Effect-rx implements several data flow patterns that optimize performance and maintain consistency across complex applications.
sequenceDiagram
participant UI as UI Component
participant Registry as Registry
participant RxNode as Rx Node
participant Effect as Effect Runtime
participant API as External API
Note over UI,API: Initial Subscription
UI->>Registry: subscribe(rx, callback)
Registry->>RxNode: ensureNode(rx)
RxNode->>Effect: execute read function
Effect->>API: fetch data
API-->>Effect: response
Effect-->>RxNode: result value
RxNode->>Registry: setValue(result)
Registry->>UI: callback(result)
Note over UI,API: State Update
UI->>Registry: set(rx, newValue)
Registry->>RxNode: setValue(newValue)
RxNode->>RxNode: invalidateChildren()
RxNode->>Registry: notify subscribers
Registry->>UI: callback(newValue)
stateDiagram-v2
[*] --> Uninitialized: Node Created
Uninitialized --> Computing: First Access
Computing --> Valid: Value Computed
Valid --> Stale: Dependency Changed
Stale --> Computing: Re-computation Needed
Valid --> Disposed: No Subscribers + TTL Expired
Stale --> Disposed: No Subscribers + TTL Expired
Disposed --> [*]: Node Removed
Valid --> Valid: Equal Value Update (No Change)
Computing --> Computing: Dependencies Loading
graph TD
A[User Input Rx] --> B[Validation Rx]
A --> C[Formatting Rx]
B --> D[Form State Rx]
C --> D
D --> E[Submit Button State]
D --> F[Error Display]
G[API Response Rx] --> H[User Data Rx]
G --> I[Loading State Rx]
H --> J[Display Name Rx]
H --> K[Profile Image Rx]
I --> L[Spinner Component]
subgraph "Memory Management"
M[Registry] --> N[TTL Cleanup]
M --> O[Reference Counting]
N --> P[Dispose Unused Nodes]
O --> P
end
This example demonstrates a complete user dashboard with async data fetching, error handling, and derived state.
import { Rx, Result, useRxValue } from "@effect-rx/rx-react"
import { Effect, Schedule } from "effect"
// Service definitions
class UserService extends Effect.Service<UserService>()("UserService", {
effect: Effect.gen(function* () {
const findById = (id: string) =>
Effect.gen(function* () {
const response = yield* HttpClient.get(`/api/users/${id}`)
return yield* response.json
}).pipe(
Effect.retry(Schedule.exponential("100 millis"))
)
return { findById } as const
})
}) {}
class NotificationService extends Effect.Service<NotificationService>()("NotificationService", {
effect: Effect.gen(function* () {
const getByUserId = (userId: string) =>
Effect.gen(function* () {
const response = yield* HttpClient.get(`/api/notifications/${userId}`)
return yield* response.json
})
return { getByUserId } as const
})
}) {}
// Runtime setup
const AppLayer = Layer.provide(
UserService.Default,
NotificationService.Default
)
const appRuntime = Rx.runtime(AppLayer)
// State management
const currentUserIdRx = Rx.make("user-123").pipe(Rx.keepAlive)
const userRx = appRuntime.rx((get) =>
Effect.gen(function* () {
const userId = get(currentUserIdRx)
const userService = yield* UserService
return yield* userService.findById(userId)
})
)
const notificationsRx = appRuntime.rx((get) =>
Effect.gen(function* () {
const userId = get(currentUserIdRx)
const notificationService = yield* NotificationService
return yield* notificationService.getByUserId(userId)
})
)
// Derived state
const unreadCountRx = Rx.mapResult(
notificationsRx,
notifications => notifications.filter(n => !n.read).length
)
const userDisplayNameRx = Rx.mapResult(
userRx,
user => `${user.firstName} ${user.lastName}`
)
// Components
function UserProfile() {
const userResult = useRxValue(userRx)
return Result.match(userResult, {
onInitial: () => <div>Loading user...</div>,
onFailure: (failure) => (
<div className="error">
Failed to load user: {Cause.pretty(failure.cause)}
{failure.waiting && <span>Retrying...</span>}
</div>
),
onSuccess: (success) => (
<div className="user-profile">
<h2>{success.value.firstName} {success.value.lastName}</h2>
<p>Email: {success.value.email}</p>
<p>Role: {success.value.role}</p>
{success.waiting && <span className="refreshing">Refreshing...</span>}
</div>
)
})
}
function NotificationBadge() {
const unreadResult = useRxValue(unreadCountRx)
return Result.match(unreadResult, {
onInitial: () => null,
onFailure: () => <span className="badge error">!</span>,
onSuccess: (success) =>
success.value > 0 ? (
<span className="badge">{success.value}</span>
) : null
})
}
function UserDashboard() {
const [userId, setUserId] = useRx(currentUserIdRx)
return (
<div className="dashboard">
<header>
<h1>User Dashboard</h1>
<NotificationBadge />
</header>
<div className="user-selector">
<input
value={userId}
onChange={(e) => setUserId(e.target.value)}
placeholder="Enter user ID"
/>
</div>
<UserProfile />
</div>
)
}This example shows a real-time chat application with WebSocket integration and optimistic updates.
import { Rx, Result, useRx, useRxValue, useRxSet } from "@effect-rx/rx-react"
import { Effect, Stream, Schedule } from "effect"
// Message types
interface Message {
id: string
userId: string
content: string
timestamp: number
status: 'sending' | 'sent' | 'failed'
}
interface User {
id: string
name: string
status: 'online' | 'offline'
}
// WebSocket service
class ChatService extends Effect.Service<ChatService>()("ChatService", {
effect: Effect.gen(function* () {
const ws = yield* Effect.promise(() =>
new Promise<WebSocket>((resolve, reject) => {
const socket = new WebSocket('ws://localhost:8080/chat')
socket.onopen = () => resolve(socket)
socket.onerror = reject
})
)
const sendMessage = (content: string) =>
Effect.sync(() => {
ws.send(JSON.stringify({ type: 'message', content }))
})
const messageStream = Stream.fromAsyncIterable(
ws.messages,
(error) => new Error(`WebSocket error: ${error}`)
)
return { sendMessage, messageStream, ws } as const
})
}) {}
const chatRuntime = Rx.runtime(ChatService.Default)
// State management
const messagesRx = Rx.make<Message[]>([]).pipe(Rx.keepAlive)
const currentUserRx = Rx.make<User>({
id: 'user-1',
name: 'Alice',
status: 'online'
}).pipe(Rx.keepAlive)
// WebSocket message stream
const incomingMessagesRx = chatRuntime.rx(
Effect.gen(function* () {
const chatService = yield* ChatService
return chatService.messageStream
})
)
// Process incoming messages
const messageProcessorRx = Rx.make((get) => {
const incomingResult = get(incomingMessagesRx)
if (incomingResult._tag === 'Success') {
const messages = get(messagesRx)
const newMessage = incomingResult.value
// Update messages list
get.set(messagesRx, [...messages, {
...newMessage,
status: 'sent'
}])
}
return incomingResult
})
// Send message function
const sendMessageRx = chatRuntime.fn(
Effect.fnUntraced(function* (content: string) {
const chatService = yield* ChatService
const registry = yield* Registry.Registry
const currentUser = registry.get(currentUserRx)
const messages = registry.get(messagesRx)
// Create optimistic message
const optimisticMessage: Message = {
id: `temp-${Date.now()}`,
userId: currentUser.id,
content,
timestamp: Date.now(),
status: 'sending'
}
// Add optimistic message immediately
registry.set(messagesRx, [...messages, optimisticMessage])
try {
// Send message
yield* chatService.sendMessage(content)
// Update status to sent (real message will come via WebSocket)
const updatedMessages = registry.get(messagesRx).map(msg =>
msg.id === optimisticMessage.id
? { ...msg, status: 'sent' as const }
: msg
)
registry.set(messagesRx, updatedMessages)
} catch (error) {
// Mark as failed on error
const failedMessages = registry.get(messagesRx).map(msg =>
msg.id === optimisticMessage.id
? { ...msg, status: 'failed' as const }
: msg
)
registry.set(messagesRx, failedMessages)
throw error
}
})
)
// Components
function MessageList() {
const messages = useRxValue(messagesRx)
return (
<div className="message-list">
{messages.map(message => (
<div
key={message.id}
className={`message ${message.status}`}
>
<div className="message-content">{message.content}</div>
<div className="message-meta">
{new Date(message.timestamp).toLocaleTimeString()}
{message.status === 'sending' && ' (sending...)'}
{message.status === 'failed' && ' (failed)'}
</div>
</div>
))}
</div>
)
}
function MessageInput() {
const [content, setContent] = React.useState('')
const sendMessage = useRxSet(sendMessageRx)
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault()
if (!content.trim()) return
try {
await sendMessage(content)
setContent('')
} catch (error) {
console.error('Failed to send message:', error)
}
}
return (
<form onSubmit={handleSubmit} className="message-input">
<input
value={content}
onChange={(e) => setContent(e.target.value)}
placeholder="Type a message..."
/>
<button type="submit">Send</button>
</form>
)
}
function ChatApp() {
// Initialize message processor
useRxValue(messageProcessorRx)
return (
<div className="chat-app">
<MessageList />
<MessageInput />
</div>
)
}This example demonstrates sophisticated form management with field-level validation, async validation, and error handling.
import { Rx, useRx, useRxValue } from "@effect-rx/rx-react"
import { Effect, Schedule, Schema } from "effect"
// Validation schemas
const EmailSchema = Schema.String.pipe(
Schema.filter(email => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email), {
message: "Invalid email format"
})
)
const PasswordSchema = Schema.String.pipe(
Schema.minLength(8, "Password must be at least 8 characters"),
Schema.filter(pwd => /[A-Z]/.test(pwd), {
message: "Password must contain uppercase letter"
}),
Schema.filter(pwd => /[0-9]/.test(pwd), {
message: "Password must contain number"
})
)
// Field state interface
interface FieldState<T> {
value: T
error: string | null
touched: boolean
validating: boolean
}
// Validation service
class ValidationService extends Effect.Service<ValidationService>()("ValidationService", {
effect: Effect.gen(function* () {
const checkEmailAvailability = (email: string) =>
Effect.gen(function* () {
// Simulate API call
yield* Effect.sleep("500 millis")
// Mock unavailable emails
const unavailable = ['admin@example.com', 'test@example.com']
return !unavailable.includes(email)
})
return { checkEmailAvailability } as const
})
}) {}
const validationRuntime = Rx.runtime(ValidationService.Default)
// Field factory
const createField = <T>(
initialValue: T,
schema?: Schema.Schema<T, any>,
asyncValidator?: (value: T) => Effect.Effect<boolean, string>
) => {
const stateRx = Rx.make<FieldState<T>>({
value: initialValue,
error: null,
touched: false,
validating: false
}).pipe(Rx.keepAlive)
const valueRx = Rx.writable(
(get) => get(stateRx).value,
(ctx, newValue: T) => {
const currentState = ctx.get(stateRx)
// Sync validation
let error: string | null = null
if (schema) {
const result = Schema.decodeUnknownEither(schema)(newValue)
if (result._tag === 'Left') {
error = result.left.message
}
}
// Update state
ctx.set(stateRx, {
value: newValue,
error: error,
touched: true,
validating: !!asyncValidator && !error
})
// Trigger async validation if needed
if (asyncValidator && !error) {
ctx.set(asyncValidationTriggerRx, newValue)
}
}
)
// Async validation
const asyncValidationTriggerRx = Rx.make<T>(initialValue)
const asyncValidationRx = validationRuntime.fn(
Effect.fnUntraced(function* (value: T) {
if (!asyncValidator) return
const isValid = yield* asyncValidator(value)
const registry = yield* Registry.Registry
const currentState = registry.get(stateRx)
registry.set(stateRx, {
...currentState,
error: isValid ? null : "Value is not valid",
validating: false
})
}).pipe(
Effect.debounce("300 millis")
)
)
// Wire up async validation
const validationWiringRx = Rx.make((get) => {
const trigger = get(asyncValidationTriggerRx)
get.set(asyncValidationRx, trigger)
return trigger
})
const blurRx = Rx.writable(
(get) => get(stateRx).touched,
(ctx, _: void) => {
const state = ctx.get(stateRx)
ctx.set(stateRx, { ...state, touched: true })
}
)
return {
stateRx,
valueRx,
blurRx,
validationWiringRx
}
}
// Form fields
const emailField = createField(
"",
EmailSchema,
(email) =>
Effect.gen(function* () {
const service = yield* ValidationService
return yield* service.checkEmailAvailability(email)
})
)
const passwordField = createField("", PasswordSchema)
const confirmPasswordField = createField("")
// Form validation
const formValidRx = Rx.make((get) => {
const emailState = get(emailField.stateRx)
const passwordState = get(passwordField.stateRx)
const confirmState = get(confirmPasswordField.stateRx)
const emailValid = !emailState.error && !emailState.validating && emailState.value.length > 0
const passwordValid = !passwordState.error && passwordState.value.length > 0
const confirmValid = confirmState.value === passwordState.value
return {
isValid: emailValid && passwordValid && confirmValid,
errors: {
email: emailState.error,
password: passwordState.error,
confirmPassword: confirmValid ? null : "Passwords don't match"
}
}
})
// Submit handler
const submitFormRx = Rx.fn(
Effect.fnUntraced(function* (formData: { email: string; password: string }) {
// Simulate form submission
yield* Effect.sleep("1 second")
// Mock submission logic
if (Math.random() > 0.8) {
throw new Error("Submission failed")
}
return { success: true, message: "Account created successfully" }
})
)
// Components
function FormField({
label,
field,
type = "text"
}: {
label: string
field: ReturnType<typeof createField>
type?: string
}) {
const [value, setValue] = useRx(field.valueRx)
const [, blur] = useRx(field.blurRx)
const state = useRxValue(field.stateRx)
// Wire up validation
useRxValue(field.validationWiringRx)
return (
<div className="form-field">
<label>{label}</label>
<input
type={type}
value={value}
onChange={(e) => setValue(e.target.value)}
onBlur={() => blur()}
className={state.error && state.touched ? 'error' : ''}
/>
{state.validating && <span className="validating">Validating...</span>}
{state.error && state.touched && (
<span className="error-message">{state.error}</span>
)}
</div>
)
}
function RegistrationForm() {
const formValidation = useRxValue(formValidRx)
const submitForm = useRxSet(submitFormRx)
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault()
if (!formValidation.isValid) return
try {
const result = await submitForm({
email: emailField.valueRx.read(),
password: passwordField.valueRx.read()
})
console.log('Success:', result)
} catch (error) {
console.error('Submission failed:', error)
}
}
return (
<form onSubmit={handleSubmit} className="registration-form">
<FormField label="Email" field={emailField} type="email" />
<FormField label="Password" field={passwordField} type="password" />
<FormField label="Confirm Password" field={confirmPasswordField} type="password" />
<button
type="submit"
disabled={!formValidation.isValid}
className="submit-button"
>
Create Account
</button>
{Object.entries(formValidation.errors).map(([field, error]) =>
error && (
<div key={field} className="form-error">
{error}
</div>
)
)}
</form>
)
}Minimizing Re-computations
// Use once() for expensive one-time calculations
const expensiveRx = Rx.make((get) => {
const config = get.once(configRx) // No dependency on config changes
const data = get(dataRx) // Re-runs when data changes
return expensiveProcessing(data, config)
})
// Conditional dependencies to avoid unnecessary work
const conditionalRx = Rx.make((get) => {
const enabled = get(enabledRx)
if (!enabled) {
return null // Don't access expensive data when disabled
}
return get(expensiveDataRx)
})Batching Updates
// Batch multiple state updates for efficiency
const updateMultipleStates = () => {
Rx.batch(() => {
registry.set(userRx, newUser)
registry.set(settingsRx, newSettings)
registry.set(preferencesRx, newPreferences)
}) // All dependent Rx values update only once
}Proper Cleanup Patterns
// Resource management with finalizers
const resourceRx = Rx.make((get) => {
const connection = createConnection()
get.addFinalizer(() => {
connection.close()
})
return connection
})
// Use keepAlive judiciously
const globalStateRx = Rx.make(initialState).pipe(
Rx.keepAlive // Only for truly global state
)
// Set appropriate TTLs for temporary data
const cacheRx = Rx.make(fetchData).pipe(
Rx.setIdleTTL("5 minutes") // Auto-cleanup cached data
)Graceful Degradation
const resilientDataRx = Rx.make((get) => {
try {
return get(primaryDataRx)
} catch (error) {
console.warn('Primary data failed, using fallback:', error)
return get(fallbackDataRx)
}
})
// Result-based error handling
const safeApiCallRx = Rx.make(
Effect.gen(function* () {
return yield* apiCall.pipe(
Effect.retry(Schedule.exponential("100 millis")),
Effect.timeout("30 seconds"),
Effect.catchAll((error) =>
Effect.succeed({ error: error.message, data: null })
)
)
})
)Test-Friendly Registry Setup
const createTestRegistry = (initialValues: Map<Rx<any>, any> = new Map()) => {
return Registry.make({
initialValues: Array.from(initialValues.entries()),
defaultIdleTTL: 0, // Immediate cleanup for tests
scheduleTask: (f) => f() // Synchronous execution
})
}
// Test with mock data
const testRegistry = createTestRegistry(new Map([
[userRx, { id: '1', name: 'Test User' }],
[settingsRx, { theme: 'light' }]
]))
const result = testRegistry.get(derivedRx)
expect(result).toEqual(expectedValue)Effect-Rx provides a powerful and type-safe reactive state management solution that seamlessly integrates with the Effect ecosystem. Its key strengths include:
- Type Safety: Full TypeScript support with precise type inference
- Automatic Cleanup: Sophisticated memory management prevents leaks
- Effect Integration: Native support for async operations, error handling, and resource management
- Fine-grained Reactivity: Efficient dependency tracking and minimal re-computations
- Framework Agnostic: Core library works with React, Vue, and other frameworks
Effect-Rx is particularly well-suited for:
- Complex Applications: Applications with significant async data flow and interdependent state
- Effect-based Projects: Projects already using Effect for business logic
- Real-time Applications: Applications requiring WebSocket integration and optimistic updates
- Form-heavy Applications: Complex forms with validation and async operations
- Data-intensive Applications: Applications with significant derived state and caching needs
- Layer Structure: Use Effect Layers to organize services and dependencies
- State Organization: Group related state in composed Rx values
- Error Boundaries: Implement proper error handling at appropriate levels
- Performance: Use
keepAlive, TTLs, and batching strategically - Testing: Design with testability in mind using registry patterns
Effect-Rx integrates seamlessly with:
- Effect: Native support for Effects, Services, Layers, and Streams
- React: Comprehensive hooks for component integration
- Vue: Composables for Vue 3 Composition API
- Schema: Type-safe validation and serialization
- Platform Libraries: File system, HTTP, and database integrations
This overview covers the essential concepts and patterns for building applications with Effect-Rx. For additional topics not covered in detail here, refer to:
- RxRuntime: Advanced Effect runtime integration patterns
- Stream Integration: Working with Effect Streams and pull-based patterns
- RxRef: Lightweight reactive references for simple state
- Function Abstractions:
Rx.fnandRx.fnSyncfor reactive functions - Storage Integration: Key-value stores and URL search parameters
- Vue Integration: Detailed Vue composables and patterns
The Effect-Rx ecosystem continues to evolve, providing increasingly sophisticated tools for reactive state management in TypeScript applications.
This technical overview demonstrates Effect-Rx's capabilities through practical examples and comprehensive API coverage. The library's integration with Effect's functional programming paradigm makes it a powerful choice for building robust, type-safe reactive applications.