- Simple pipelines should be trivially readable
- Topology is visible at a glance (no hunting through nested YAML)
- Braces for blocks, not indentation (no YAML footguns)
- Bloblang stays as-is for expressions and mappings
- Every construct maps to a visual node/edge
- Progressive disclosure: complexity is opt-in
-> data flows to next step
{ } config blocks and inline processors
[ ] fan-out (parallel outputs)
| fan-in (merge inputs)
switch { } conditional routing
batch(...) mid-pipeline batching operator
let name = ... named node for reuse and DAGs
resource reusable parameterized component
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
consumer_group: grp
pipeline:
processors:
- mapping: |
root.body = this.content.uppercase()
output:
aws_s3:
bucket: archive
path: '${! json("id") }.json'kafka(topic: "events", group: "grp")
-> mapping { root.body = this.content.uppercase() }
-> aws_s3(bucket: "archive", path: "${! json(\"id\") }.json")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
consumer_group: grp
pipeline:
processors:
- mapping: |
root = this
root.timestamp = now()
root.body = this.content.uppercase()
root.word_count = this.content.split(" ").length()
output:
aws_s3:
bucket: archivekafka(topic: "events", group: "grp")
-> mapping {
root = this
root.timestamp = now()
root.body = this.content.uppercase()
root.word_count = this.content.split(" ").length()
}
-> aws_s3(bucket: "archive")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- mapping: |
root = this.content.parse_json()
- jmespath:
query: "users[?active]"
- mapping: |
root.processed_at = now()
output:
kafka:
addresses: [ localhost:9092 ]
topic: processedkafka(topic: "events")
-> mapping { root = this.content.parse_json() }
-> jmespath(query: "users[?active]")
-> mapping { root.processed_at = now() }
-> kafka(topic: "processed")
input:
kafka:
addresses:
- broker1:9092
- broker2:9092
topics: [ events ]
consumer_group: processors
tls:
enabled: true
root_cas_file: /certs/ca.crt
cert_file: /certs/client.crt
key_file: /certs/client.key
pipeline:
processors:
- mapping: "root = this"
output:
aws_s3:
bucket: output
path: '${! meta("kafka_topic") }/${! json("id") }.json'
content_type: application/json
max_in_flight: 64kafka {
addresses: ["broker1:9092", "broker2:9092"]
topic: "events"
group: "processors"
tls {
enabled: true
cert_file: "/certs/client.crt"
key_file: "/certs/client.key"
}
}
-> mapping { root = this }
-> aws_s3 {
bucket: "output"
path: "${! meta(\"kafka_topic\") }/${! json(\"id\") }.json"
content_type: "application/json"
max_in_flight: 64
}
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- mapping: |
root = this.content.parse_json()
output:
broker:
pattern: fan_out
outputs:
- aws_s3:
bucket: archive
- kafka:
addresses: [ localhost:9092 ]
topic: processed
- http_client:
url: http://webhook.example.com/notify
verb: POSTkafka(topic: "events")
-> mapping { root = this.content.parse_json() }
-> [
aws_s3(bucket: "archive"),
kafka(topic: "processed"),
http(url: "http://webhook.example.com/notify", verb: POST),
]
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- mapping: |
root = this.content.parse_json()
output:
switch:
cases:
- check: this.type == "order"
output:
kafka:
addresses: [ localhost:9092 ]
topic: orders
- check: this.type == "click"
output:
kafka:
addresses: [ localhost:9092 ]
topic: clicks
- check: this.type == "payment"
output:
kafka:
addresses: [ localhost:9092 ]
topic: payments
- output:
kafka:
addresses: [ localhost:9092 ]
topic: otherkafka(topic: "events")
-> mapping { root = this.content.parse_json() }
-> switch {
this.type == "order" -> kafka(topic: "orders")
this.type == "click" -> kafka(topic: "clicks")
this.type == "payment" -> kafka(topic: "payments")
_ -> kafka(topic: "other")
}
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
output:
switch:
cases:
- check: this.priority == "high"
continue: true
output:
http_client:
url: http://pagerduty.com/alert
verb: POST
- check: this.region == "eu"
continue: true
output:
kafka:
addresses: [ localhost:9092 ]
topic: eu-events
- check: this.region == "us"
continue: true
output:
kafka:
addresses: [ localhost:9092 ]
topic: us-events
- output:
kafka:
addresses: [ localhost:9092 ]
topic: all-eventskafka(topic: "events")
-> switch(continue: true) {
this.priority == "high" -> http(url: "http://pagerduty.com/alert", verb: POST)
this.region == "eu" -> kafka(topic: "eu-events")
this.region == "us" -> kafka(topic: "us-events")
_ -> kafka(topic: "all-events")
}
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
output:
broker:
pattern: round_robin
outputs:
- http_client:
url: http://worker-1:8080/process
verb: POST
- http_client:
url: http://worker-2:8080/process
verb: POST
- http_client:
url: http://worker-3:8080/process
verb: POSTkafka(topic: "events")
-> round_robin [
http(url: "http://worker-1:8080/process", verb: POST),
http(url: "http://worker-2:8080/process", verb: POST),
http(url: "http://worker-3:8080/process", verb: POST),
]
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
output:
broker:
pattern: try
outputs:
- aws_s3:
bucket: primary-bucket
- aws_s3:
bucket: fallback-bucket
region: us-west-2
- file:
path: '/var/spool/dead-letters/${! count("msgs") }.json'kafka(topic: "events")
-> try [
aws_s3(bucket: "primary-bucket"),
aws_s3(bucket: "fallback-bucket", region: "us-west-2"),
file(path: "/var/spool/dead-letters/${! count(\"msgs\") }.json"),
]
input:
broker:
inputs:
- kafka:
addresses: [ localhost:9092 ]
topics: [ orders ]
consumer_group: pipeline
- kafka:
addresses: [ localhost:9092 ]
topics: [ returns ]
consumer_group: pipeline
pipeline:
processors:
- mapping: |
root = this
root.source = meta("kafka_topic")
output:
sql_insert:
driver: postgres
dsn: "postgres://localhost/warehouse"
table: events
columns: [ source, payload ]
args_mapping: |
root = [ this.source, this.string() ](kafka(topic: "orders", group: "pipeline") | kafka(topic: "returns", group: "pipeline"))
-> mapping {
root = this
root.source = meta("kafka_topic")
}
-> postgresql(dsn: "postgres://localhost/warehouse", table: "events")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
batching:
count: 100
pipeline:
processors:
- mapping: "root = this"
output:
aws_s3:
bucket: output
batching:
count: 100kafka(topic: "events")
-> batch(count: 100)
-> mapping { root = this }
-> aws_s3(bucket: "output")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- mapping: "root = this"
output:
aws_s3:
bucket: output
batching:
count: 100
byte_size: 5000000
period: 30s
processors:
- mapping: |
root.items = batch_contents()
root.count = batch_size()
root.flushed_at = now()kafka(topic: "events")
-> batch(count: 100, size: "5MB", period: "30s")
-> mapping {
root.items = batch_contents()
root.count = batch_size()
root.flushed_at = now()
}
-> aws_s3(bucket: "output")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
output:
aws_s3:
bucket: output
batching:
count: 100
processors:
- mapping: |
root = batch_contents().sort_by(v -> v.timestamp)
- dedupe:
key: '${! json("id") }'kafka(topic: "events")
-> batch(count: 100) {
mapping { root = batch_contents().sort_by(v -> v.timestamp) }
deduplicate(key: "${! json(\"id\") }")
}
-> aws_s3(bucket: "output")
Note: the output erases the batch and consumes messages individually. The batch block is where batch-level operations happen before erasure.
Requires separate stream configs or deeply nested brokers. No single-file equivalent exists for this topology. The closest approximation uses the workflow processor, but it only applies to the processor stage and cannot express multi-input fan-in.
# Stream 1: orders pipeline (separate file)
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ orders ]
consumer_group: pipeline
pipeline:
processors:
- mapping: |
root = this
root.source = meta("kafka_topic")
- http:
url: http://enrichment-api/v1/enrich
verb: POST
output:
broker:
pattern: fan_out
outputs:
- sql_insert:
driver: postgres
dsn: "postgres://localhost/db"
table: events
columns: [ source, payload ]
args_mapping: 'root = [ this.source, this.string() ]'
- kafka:
addresses: [ localhost:9092 ]
topic: enriched-events
# Stream 2: returns pipeline (separate file, mostly duplicated)
# ... same structure with topic: returns ...let orders = kafka(topic: "orders", group: "pipeline")
let returns = kafka(topic: "returns", group: "pipeline")
let enriched = (orders | returns)
-> mapping {
root = this
root.source = meta("kafka_topic")
}
-> http(url: "http://enrichment-api/v1/enrich", verb: POST)
enriched -> [
postgresql(dsn: "postgres://localhost/db", table: "events"),
kafka(topic: "enriched-events"),
]
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ documents ]
consumer_group: enrichment
pipeline:
processors:
- workflow:
meta_path: meta.workflow
order: [ [ nlp, sentiment, entities ] ]
branches:
nlp:
request_map: 'root = this.content'
processors:
- http:
url: http://nlp-service/analyze
verb: POST
result_map: 'root.nlp = this'
sentiment:
request_map: 'root = this.content'
processors:
- http:
url: http://sentiment-service/score
verb: POST
result_map: 'root.sentiment = this'
entities:
request_map: 'root = this.content'
processors:
- http:
url: http://ner-service/extract
verb: POST
result_map: 'root.entities = this'
output:
broker:
pattern: fan_out
outputs:
- elasticsearch:
urls: [ http://localhost:9200 ]
index: documents
- kafka:
addresses: [ localhost:9092 ]
topic: enriched-docslet input = kafka(topic: "documents", group: "enrichment")
let nlp = input -> http(url: "http://nlp-service/analyze", verb: POST)
let sentiment = input -> http(url: "http://sentiment-service/score", verb: POST)
let entities = input -> http(url: "http://ner-service/extract", verb: POST)
join(nlp, sentiment, entities)
-> mapping {
root = this
root.nlp = nlp.result
root.sentiment = sentiment.result
root.entities = entities.result
}
-> [
elasticsearch(index: "documents"),
kafka(topic: "enriched-docs"),
]
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- workflow:
meta_path: meta.workflow
order: [ [ geo, device ], [ weather ], [ final ] ]
branches:
geo:
request_map: 'root = this.ip'
processors:
- http:
url: http://geo-service/lookup
verb: POST
result_map: 'root.geo = this'
device:
request_map: 'root = this.user_agent'
processors:
- http:
url: http://device-service/info
verb: POST
result_map: 'root.device = this'
weather:
request_map: 'root = this.geo.location'
processors:
- http:
url: http://weather-api/current
verb: POST
result_map: 'root.weather = this'
output:
kafka:
addresses: [ localhost:9092 ]
topic: enrichedlet input = kafka(topic: "events")
// Stage 1: geo and device run in parallel
let geo = input -> http(url: "http://geo-service/lookup")
let device = input -> http(url: "http://device-service/info")
// Stage 2: weather depends on geo (inferred from reference)
let weather = geo -> http(url: "http://weather-api/current")
// Stage 3: join all results
join(geo, device, weather)
-> mapping {
root.geo = geo.result
root.device = device.result
root.weather = weather.result
}
-> kafka(topic: "enriched")
The runtime infers execution order from the dependency graph:
geoanddevicerun in parallelweatherwaits forgeo- The join waits for all three
# resources section (or separate files)
output_resources:
- label: dead_letter
kafka:
addresses: [ localhost:9092 ]
topic: dead-letters
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- try:
- http:
url: http://api.example.com/process
verb: POST
timeout: 5s
retries: 5
backoff:
initial_interval: 1s
max_interval: 30s
- catch:
- mapping: |
root = this
root.error = error()
- resource: dead_letter
output:
kafka:
addresses: [ localhost:9092 ]
topic: processedresource dead_letter = kafka(topic: "dead-letters")
resource enrich_http(url) = http {
url: url
verb: POST
timeout: "5s"
retry: { max: 5, backoff: "1s..30s" }
on_error -> dead_letter
}
kafka(topic: "events")
-> enrich_http(url: "http://api.example.com/process")
-> kafka(topic: "processed")
# Pipeline 1 (file: orders.yaml)
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ orders ]
pipeline:
processors:
- mapping: |
root = this.content.parse_json()
- schema_registry_encode:
url: http://schema-registry:8081
subject: orders-v2
- mapping: |
root = this
root.normalized_at = now()
output:
sql_insert:
driver: postgres
dsn: "postgres://localhost/db"
table: orders
columns: [ data ]
args_mapping: 'root = [ this.string() ]'
# Pipeline 2 (file: payments.yaml)
# Nearly identical, just different topic, schema, and table
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ payments ]
pipeline:
processors:
- mapping: |
root = this.content.parse_json()
- schema_registry_encode:
url: http://schema-registry:8081
subject: payments-v1
- mapping: |
root = this
root.normalized_at = now()
output:
sql_insert:
driver: postgres
dsn: "postgres://localhost/db"
table: payments
columns: [ data ]
args_mapping: 'root = [ this.string() ]'resource normalize(schema) =
mapping { root = this.content.parse_json() }
-> schema_registry(schema: schema, action: "validate")
-> mapping {
root = this
root.normalized_at = now()
}
kafka(topic: "orders")
-> normalize(schema: "orders-v2")
-> postgresql(table: "orders")
kafka(topic: "payments")
-> normalize(schema: "payments-v1")
-> postgresql(table: "payments")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
consumer_group: grp
pipeline:
threads: 4
processors:
- mapping: |
root = this.content.uppercase()
output:
aws_s3:
bucket: output
metrics:
prometheus:
prefix: rpcn
tracer:
open_telemetry_collector:
grpc:
address: jaeger:4317
logger:
level: INFO
format: jsonconfig {
threads: 4
metrics {
prometheus(prefix: "rpcn")
}
tracing {
otel(endpoint: "http://jaeger:4317")
}
logger {
level: "info"
format: "json"
}
}
kafka(topic: "events", group: "grp")
-> mapping { root = this.content.uppercase() }
-> aws_s3(bucket: "output")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- try:
- mapping: |
root = this.content.parse_json()
- catch:
- mapping: "root = deleted()"
- try:
- http:
url: http://api/process
verb: POST
retries: 3
backoff:
initial_interval: 1s
max_interval: 10s
output:
kafka:
addresses: [ localhost:9092 ]
topic: processedkafka(topic: "events")
-> mapping { root = this.content.parse_json() }
on_error: drop
-> http(url: "http://api/process", verb: POST)
on_error: retry(max: 3, backoff: "1s..10s")
-> kafka(topic: "processed")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- try:
- mapping: |
root = this.content.parse_json()
- catch:
- mapping: |
root.original = this
root.error = error()
root.stage = "parse"
- resource: parse_error_output
- try:
- http:
url: http://api/process
verb: POST
- catch:
- mapping: |
root.original = this
root.error = error()
root.stage = "api"
- resource: api_error_output
output:
kafka:
addresses: [ localhost:9092 ]
topic: processed
output_resources:
- label: parse_error_output
kafka:
addresses: [ localhost:9092 ]
topic: parse-errors
- label: api_error_output
kafka:
addresses: [ localhost:9092 ]
topic: api-errorskafka(topic: "events")
-> mapping { root = this.content.parse_json() }
on_error -> kafka(topic: "parse-errors")
-> http(url: "http://api/process", verb: POST)
on_error -> kafka(topic: "api-errors")
-> kafka(topic: "processed")
input:
kafka:
addresses: [ localhost:9092 ]
topics: [ events ]
pipeline:
processors:
- try:
- mapping: |
root = this.content.parse_json()
- http:
url: http://api/process
verb: POST
- catch:
- mapping: |
root.original = this
root.error = error()
root.failed_at = now()
- resource: dead_letter
output:
kafka:
addresses: [ localhost:9092 ]
topic: processed
output_resources:
- label: dead_letter
kafka:
addresses: [ localhost:9092 ]
topic: dead-letterskafka(topic: "events")
-> mapping { root = this.content.parse_json() }
-> http(url: "http://api/process", verb: POST)
-> kafka(topic: "processed")
on_error -> mapping {
root.original = error_message()
root.error = error()
root.failed_at = now()
}
-> kafka(topic: "dead-letters")
Every DSL construct maps directly to a visual element:
| DSL construct | Visual element |
|---|---|
component(...) |
Rectangle node with type + label |
-> |
Directed edge (arrow) |
[a, b, c] |
Fan-out node (one input, N outputs) |
(a | b) |
Fan-in node (N inputs, one output) |
switch { } |
Diamond decision node with branches |
batch(...) |
Accumulator node (shows policy) |
join(...) |
Scatter-gather join node |
let name = ... |
Named group / subgraph |
resource |
Collapsed reusable subgraph |
on_error -> |
Red dashed edge to error handler |
config { } |
Side panel / metadata (not a node) |
-
mergevsjoinsemantics -|is merge (interleaved independent messages),join()is scatter-gather (correlate by original message ID). Both needed. Current proposal uses|for merge,join()for correlation. -
Imports -
import "./common/retry.rpcn"for cross-file resources? -
String interpolation - keep Bloblang's
${! ... }or simplify? -
Comments -
//line,/* */block? -
File extension -
.rpcn?.rpf? -
Migration -
rpcn converttool to translate existing YAML? -
Multi-pipeline - multiple pipelines in one file via named blocks?