|
import { |
|
all, |
|
createScope, |
|
resource, |
|
sleep, |
|
suspend, |
|
until, |
|
withResolvers, |
|
type Operation, |
|
} from "effection"; |
|
|
|
// ============================================ |
|
// INTERNAL: Connection as a proper resource |
|
// ============================================ |
|
|
|
interface Connection { |
|
id: string; |
|
execute(query: string): Operation<QueryResult>; |
|
} |
|
|
|
interface QueryResult { |
|
rows: unknown[]; |
|
} |
|
|
|
// Placeholder for actual database connection |
|
function connectToDatabase(): Promise<{ close(): void }> { |
|
return Promise.resolve({ close() {} }); |
|
} |
|
|
|
// Connection with proper lifecycle via resource() |
|
function connection(id: string): Operation<Connection> { |
|
return resource(function* (provide) { |
|
// Setup: runs before provide |
|
const socket = yield* until(connectToDatabase()); |
|
|
|
const conn: Connection = { |
|
id, |
|
*execute(_query: string) { |
|
// ... query implementation |
|
return { rows: [] }; |
|
}, |
|
}; |
|
|
|
try { |
|
yield* provide(conn); // Value is now available |
|
} finally { |
|
// Cleanup: always runs on scope exit/error/halt |
|
socket.close(); |
|
} |
|
}); |
|
} |
|
|
|
// ============================================ |
|
// The pool itself as a resource |
|
// ============================================ |
|
|
|
interface PoolState { |
|
connections: Connection[]; |
|
available: Connection[]; |
|
acquire(): Operation<Connection>; |
|
release(conn: Connection): void; |
|
} |
|
|
|
function connectionPool(maxConnections: number): Operation<PoolState> { |
|
return resource(function* (provide) { |
|
const connections: Connection[] = []; |
|
const available: Connection[] = []; |
|
|
|
// Setup: create all connections (they're resources too) |
|
for (let i = 0; i < maxConnections; i++) { |
|
const conn = yield* connection(`conn-${i}`); |
|
connections.push(conn); |
|
available.push(conn); |
|
} |
|
|
|
const state: PoolState = { |
|
connections, |
|
available, |
|
*acquire() { |
|
while (available.length === 0) { |
|
yield* sleep(50); |
|
} |
|
return available.pop()!; |
|
}, |
|
release(conn) { |
|
available.push(conn); |
|
}, |
|
}; |
|
|
|
yield* provide(state); |
|
}); |
|
} |
|
|
|
// ============================================ |
|
// PUBLIC: Promise-based API |
|
// ============================================ |
|
|
|
export interface ConnectionPool extends AsyncDisposable { |
|
query(sql: string): Promise<QueryResult>; |
|
queryAll(queries: string[]): Promise<QueryResult[]>; |
|
shutdown(): Promise<void>; |
|
} |
|
|
|
export function createPool( |
|
options: { maxConnections: number }, |
|
): ConnectionPool { |
|
// createScope returns [Scope, () => Future<void>] |
|
const [scope, destroy] = createScope(); |
|
|
|
// withResolvers gives us an operation + resolve/reject functions |
|
// This avoids race conditions between initialization and usage |
|
const { operation: poolReady, resolve, reject } = withResolvers<PoolState>(); |
|
|
|
// Track shutdown state - scope.run() can still be called after destroy() |
|
let closed = false; |
|
|
|
scope.run(function* () { |
|
try { |
|
const pool = yield* connectionPool(options.maxConnections); |
|
resolve(pool); // Signal: pool is ready for use |
|
yield* suspend(); // Keep scope alive until destroy() is called |
|
} catch (e) { |
|
// Normalize errors before rejecting |
|
reject(e instanceof Error ? e : new Error(String(e))); |
|
} |
|
}); |
|
|
|
// Internal operation - waits for pool to be ready before executing |
|
function* executeQuery(sql: string): Operation<QueryResult> { |
|
// Guard against post-shutdown calls |
|
if (closed) { |
|
throw new Error("Pool is shut down"); |
|
} |
|
|
|
const pool = yield* poolReady; // Blocks until resolved |
|
|
|
const conn = yield* pool.acquire(); |
|
try { |
|
return yield* conn.execute(sql); |
|
} finally { |
|
pool.release(conn); |
|
} |
|
} |
|
|
|
// Shutdown function with idempotency guard |
|
async function shutdown(): Promise<void> { |
|
if (closed) return; |
|
closed = true; |
|
// destroy() returns a Future<void> - must be awaited |
|
// This triggers all resource finally blocks |
|
await destroy(); |
|
} |
|
|
|
return { |
|
query(sql: string): Promise<QueryResult> { |
|
// Thin bridge - just delegates to scope.run() |
|
// If scope is destroyed, this naturally rejects with "halted" |
|
return scope.run(() => executeQuery(sql)); |
|
}, |
|
|
|
queryAll(queries: string[]): Promise<QueryResult[]> { |
|
return scope.run(function* () { |
|
return yield* all(queries.map((sql) => executeQuery(sql))); |
|
}); |
|
}, |
|
|
|
shutdown, |
|
|
|
[Symbol.asyncDispose](): Promise<void> { |
|
return shutdown(); |
|
}, |
|
}; |
|
} |
|
|
|
// ============================================ |
|
// CLIENT USAGE (Traditional Async/Await) |
|
// ============================================ |
|
|
|
async function main() { |
|
// Option 1: Manual cleanup |
|
const pool = createPool({ maxConnections: 10 }); |
|
|
|
const result = await pool.query("SELECT * FROM users"); |
|
console.log(result.rows); |
|
|
|
// Parallel queries - Effection handles structured concurrency internally |
|
const [users, orders, products] = await pool.queryAll([ |
|
"SELECT * FROM users", |
|
"SELECT * FROM orders", |
|
"SELECT * FROM products", |
|
]); |
|
|
|
// Graceful shutdown - all connections properly closed |
|
await pool.shutdown(); |
|
} |
|
|
|
async function mainWithAutoDispose() { |
|
// Option 2: Automatic cleanup with await using |
|
await using pool = createPool({ maxConnections: 10 }); |
|
|
|
const result = await pool.query("SELECT * FROM users"); |
|
console.log(result.rows); |
|
|
|
const [users, orders, products] = await pool.queryAll([ |
|
"SELECT * FROM users", |
|
"SELECT * FROM orders", |
|
"SELECT * FROM products", |
|
]); |
|
|
|
// No need to call shutdown() - automatically disposed when scope exits |
|
} |
|
|
|
main(); |