Created
March 6, 2026 02:10
-
-
Save brandonhimpfen/191ee8ea404103ec34e61243ed2bfc36 to your computer and use it in GitHub Desktop.
Stream-transform JSONL/NDJSON in Node.js with filter/map style processing, stdin to stdout, and proper backpressure handling (no deps).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env node | |
| /** | |
| * Stream-transform JSONL (NDJSON) from stdin to stdout with backpressure. | |
| * | |
| * Features: | |
| * - Reads JSONL from stdin | |
| * - Parses one JSON object per line | |
| * - Applies transform/filter logic | |
| * - Writes JSONL to stdout | |
| * - Respects stdout backpressure | |
| * - No dependencies | |
| * | |
| * Usage: | |
| * cat input.jsonl | node node-transform-jsonl-stream.js > output.jsonl | |
| * | |
| * Example: | |
| * cat input.jsonl | node node-transform-jsonl-stream.js | |
| */ | |
| const readline = require("readline"); | |
| const { once } = require("events"); | |
| async function writeLine(line) { | |
| if (!process.stdout.write(line + "\n")) { | |
| await once(process.stdout, "drain"); | |
| } | |
| } | |
| async function transformRecord(record) { | |
| // Example filter: | |
| // Skip records marked inactive | |
| if (record.active === false) { | |
| return null; | |
| } | |
| // Example map: | |
| return { | |
| ...record, | |
| processed_at: new Date().toISOString(), | |
| }; | |
| } | |
| async function main() { | |
| const rl = readline.createInterface({ | |
| input: process.stdin, | |
| crlfDelay: Infinity, | |
| }); | |
| let lineNum = 0; | |
| try { | |
| for await (const line of rl) { | |
| lineNum += 1; | |
| // Skip blank lines | |
| if (!line.trim()) { | |
| continue; | |
| } | |
| let record; | |
| try { | |
| record = JSON.parse(line); | |
| } catch (err) { | |
| throw new Error(`Invalid JSON on line ${lineNum}: ${err.message}`); | |
| } | |
| const transformed = await transformRecord(record); | |
| // Null means "filtered out" | |
| if (transformed === null || transformed === undefined) { | |
| continue; | |
| } | |
| await writeLine(JSON.stringify(transformed)); | |
| } | |
| } finally { | |
| rl.close(); | |
| } | |
| } | |
| main().catch((err) => { | |
| console.error("ERROR:", err && err.stack ? err.stack : err); | |
| process.exit(1); | |
| }); |
Author
Author
Example output:
{"id":1,"name":"Alice","active":true,"processed_at":"2026-03-06T00:00:00.000Z"}
{"id":3,"name":"Charlie","active":true,"processed_at":"2026-03-06T00:00:00.000Z"}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Optional example input: