Skip to content

Instantly share code, notes, and snippets.

@mmatczuk
Last active February 19, 2026 09:39
Show Gist options
  • Select an option

  • Save mmatczuk/b85c615bd5952743e332ed71745289ca to your computer and use it in GitHub Desktop.

Select an option

Save mmatczuk/b85c615bd5952743e332ed71745289ca to your computer and use it in GitHub Desktop.
Designing a pipeline expression language

RPCN Pipeline DSL Proposal

Design goals

  • Simple pipelines should be trivially readable
  • Topology is visible at a glance (no hunting through nested YAML)
  • Braces for blocks, not indentation (no YAML footguns)
  • Bloblang stays as-is for expressions and mappings
  • Every construct maps to a visual node/edge
  • Progressive disclosure: complexity is opt-in

Syntax primitives

->              data flows to next step
{ }             config blocks and inline processors
[ ]             fan-out (parallel outputs)
|               fan-in (merge inputs)
switch { }      conditional routing
batch(...)      mid-pipeline batching operator
let name = ...  named node for reuse and DAGs
resource        reusable parameterized component

Level 1: Simple linear pipeline

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]
    consumer_group: grp

pipeline:
  processors:
    - mapping: |
        root.body = this.content.uppercase()

output:
  aws_s3:
    bucket: archive
    path: '${! json("id") }.json'

After (DSL)

kafka(topic: "events", group: "grp")
  -> mapping { root.body = this.content.uppercase() }
  -> aws_s3(bucket: "archive", path: "${! json(\"id\") }.json")

Level 1b: Multi-line processor

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]
    consumer_group: grp

pipeline:
  processors:
    - mapping: |
        root = this
        root.timestamp = now()
        root.body = this.content.uppercase()
        root.word_count = this.content.split(" ").length()

output:
  aws_s3:
    bucket: archive

After (DSL)

kafka(topic: "events", group: "grp")
  -> mapping {
    root = this
    root.timestamp = now()
    root.body = this.content.uppercase()
    root.word_count = this.content.split(" ").length()
  }
  -> aws_s3(bucket: "archive")

Level 1c: Multiple processors in sequence

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()
    - jmespath:
        query: "users[?active]"
    - mapping: |
        root.processed_at = now()

output:
  kafka:
    addresses: [ localhost:9092 ]
    topic: processed

After (DSL)

kafka(topic: "events")
  -> mapping { root = this.content.parse_json() }
  -> jmespath(query: "users[?active]")
  -> mapping { root.processed_at = now() }
  -> kafka(topic: "processed")

Level 2: Component with many parameters

Before (YAML)

input:
  kafka:
    addresses:
      - broker1:9092
      - broker2:9092
    topics: [ events ]
    consumer_group: processors
    tls:
      enabled: true
      root_cas_file: /certs/ca.crt
      cert_file: /certs/client.crt
      key_file: /certs/client.key

pipeline:
  processors:
    - mapping: "root = this"

output:
  aws_s3:
    bucket: output
    path: '${! meta("kafka_topic") }/${! json("id") }.json'
    content_type: application/json
    max_in_flight: 64

After (DSL)

kafka {
  addresses: ["broker1:9092", "broker2:9092"]
  topic: "events"
  group: "processors"
  tls {
    enabled: true
    cert_file: "/certs/client.crt"
    key_file: "/certs/client.key"
  }
}
  -> mapping { root = this }
  -> aws_s3 {
    bucket: "output"
    path: "${! meta(\"kafka_topic\") }/${! json(\"id\") }.json"
    content_type: "application/json"
    max_in_flight: 64
  }

Level 3a: Broadcast fan-out

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()

output:
  broker:
    pattern: fan_out
    outputs:
      - aws_s3:
          bucket: archive
      - kafka:
          addresses: [ localhost:9092 ]
          topic: processed
      - http_client:
          url: http://webhook.example.com/notify
          verb: POST

After (DSL)

kafka(topic: "events")
  -> mapping { root = this.content.parse_json() }
  -> [
    aws_s3(bucket: "archive"),
    kafka(topic: "processed"),
    http(url: "http://webhook.example.com/notify", verb: POST),
  ]

Level 3b: Conditional routing (switch)

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()

output:
  switch:
    cases:
      - check: this.type == "order"
        output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: orders
      - check: this.type == "click"
        output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: clicks
      - check: this.type == "payment"
        output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: payments
      - output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: other

After (DSL)

kafka(topic: "events")
  -> mapping { root = this.content.parse_json() }
  -> switch {
    this.type == "order"   -> kafka(topic: "orders")
    this.type == "click"   -> kafka(topic: "clicks")
    this.type == "payment" -> kafka(topic: "payments")
    _                      -> kafka(topic: "other")
  }

Level 3c: Non-exclusive routing (continue)

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

output:
  switch:
    cases:
      - check: this.priority == "high"
        continue: true
        output:
          http_client:
            url: http://pagerduty.com/alert
            verb: POST
      - check: this.region == "eu"
        continue: true
        output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: eu-events
      - check: this.region == "us"
        continue: true
        output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: us-events
      - output:
          kafka:
            addresses: [ localhost:9092 ]
            topic: all-events

After (DSL)

kafka(topic: "events")
  -> switch(continue: true) {
    this.priority == "high" -> http(url: "http://pagerduty.com/alert", verb: POST)
    this.region == "eu"     -> kafka(topic: "eu-events")
    this.region == "us"     -> kafka(topic: "us-events")
    _                       -> kafka(topic: "all-events")
  }

Level 3d: Round-robin

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

output:
  broker:
    pattern: round_robin
    outputs:
      - http_client:
          url: http://worker-1:8080/process
          verb: POST
      - http_client:
          url: http://worker-2:8080/process
          verb: POST
      - http_client:
          url: http://worker-3:8080/process
          verb: POST

After (DSL)

kafka(topic: "events")
  -> round_robin [
    http(url: "http://worker-1:8080/process", verb: POST),
    http(url: "http://worker-2:8080/process", verb: POST),
    http(url: "http://worker-3:8080/process", verb: POST),
  ]

Level 3e: Fallback chain (try)

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

output:
  broker:
    pattern: try
    outputs:
      - aws_s3:
          bucket: primary-bucket
      - aws_s3:
          bucket: fallback-bucket
          region: us-west-2
      - file:
          path: '/var/spool/dead-letters/${! count("msgs") }.json'

After (DSL)

kafka(topic: "events")
  -> try [
    aws_s3(bucket: "primary-bucket"),
    aws_s3(bucket: "fallback-bucket", region: "us-west-2"),
    file(path: "/var/spool/dead-letters/${! count(\"msgs\") }.json"),
  ]

Level 4: Fan-in (merge inputs)

Before (YAML)

input:
  broker:
    inputs:
      - kafka:
          addresses: [ localhost:9092 ]
          topics: [ orders ]
          consumer_group: pipeline
      - kafka:
          addresses: [ localhost:9092 ]
          topics: [ returns ]
          consumer_group: pipeline

pipeline:
  processors:
    - mapping: |
        root = this
        root.source = meta("kafka_topic")

output:
  sql_insert:
    driver: postgres
    dsn: "postgres://localhost/warehouse"
    table: events
    columns: [ source, payload ]
    args_mapping: |
      root = [ this.source, this.string() ]

After (DSL)

(kafka(topic: "orders", group: "pipeline") | kafka(topic: "returns", group: "pipeline"))
  -> mapping {
    root = this
    root.source = meta("kafka_topic")
  }
  -> postgresql(dsn: "postgres://localhost/warehouse", table: "events")

Level 5a: Simple batch

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]
    batching:
      count: 100

pipeline:
  processors:
    - mapping: "root = this"

output:
  aws_s3:
    bucket: output
    batching:
      count: 100

After (DSL)

kafka(topic: "events")
  -> batch(count: 100)
  -> mapping { root = this }
  -> aws_s3(bucket: "output")

Level 5b: Batch with multiple triggers

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - mapping: "root = this"

output:
  aws_s3:
    bucket: output
    batching:
      count: 100
      byte_size: 5000000
      period: 30s
      processors:
        - mapping: |
            root.items = batch_contents()
            root.count = batch_size()
            root.flushed_at = now()

After (DSL)

kafka(topic: "events")
  -> batch(count: 100, size: "5MB", period: "30s")
  -> mapping {
    root.items = batch_contents()
    root.count = batch_size()
    root.flushed_at = now()
  }
  -> aws_s3(bucket: "output")

Level 5c: Batch with inline processing

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

output:
  aws_s3:
    bucket: output
    batching:
      count: 100
      processors:
        - mapping: |
            root = batch_contents().sort_by(v -> v.timestamp)
        - dedupe:
            key: '${! json("id") }'

After (DSL)

kafka(topic: "events")
  -> batch(count: 100) {
    mapping { root = batch_contents().sort_by(v -> v.timestamp) }
    deduplicate(key: "${! json(\"id\") }")
  }
  -> aws_s3(bucket: "output")

Note: the output erases the batch and consumes messages individually. The batch block is where batch-level operations happen before erasure.


Level 6a: Named nodes for complex topology

Before (YAML)

Requires separate stream configs or deeply nested brokers. No single-file equivalent exists for this topology. The closest approximation uses the workflow processor, but it only applies to the processor stage and cannot express multi-input fan-in.

# Stream 1: orders pipeline (separate file)
input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ orders ]
    consumer_group: pipeline

pipeline:
  processors:
    - mapping: |
        root = this
        root.source = meta("kafka_topic")
    - http:
        url: http://enrichment-api/v1/enrich
        verb: POST

output:
  broker:
    pattern: fan_out
    outputs:
      - sql_insert:
          driver: postgres
          dsn: "postgres://localhost/db"
          table: events
          columns: [ source, payload ]
          args_mapping: 'root = [ this.source, this.string() ]'
      - kafka:
          addresses: [ localhost:9092 ]
          topic: enriched-events

# Stream 2: returns pipeline (separate file, mostly duplicated)
# ... same structure with topic: returns ...

After (DSL)

let orders  = kafka(topic: "orders", group: "pipeline")
let returns = kafka(topic: "returns", group: "pipeline")

let enriched = (orders | returns)
  -> mapping {
    root = this
    root.source = meta("kafka_topic")
  }
  -> http(url: "http://enrichment-api/v1/enrich", verb: POST)

enriched -> [
  postgresql(dsn: "postgres://localhost/db", table: "events"),
  kafka(topic: "enriched-events"),
]

Level 6b: Scatter-gather enrichment

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ documents ]
    consumer_group: enrichment

pipeline:
  processors:
    - workflow:
        meta_path: meta.workflow
        order: [ [ nlp, sentiment, entities ] ]
        branches:
          nlp:
            request_map: 'root = this.content'
            processors:
              - http:
                  url: http://nlp-service/analyze
                  verb: POST
            result_map: 'root.nlp = this'
          sentiment:
            request_map: 'root = this.content'
            processors:
              - http:
                  url: http://sentiment-service/score
                  verb: POST
            result_map: 'root.sentiment = this'
          entities:
            request_map: 'root = this.content'
            processors:
              - http:
                  url: http://ner-service/extract
                  verb: POST
            result_map: 'root.entities = this'

output:
  broker:
    pattern: fan_out
    outputs:
      - elasticsearch:
          urls: [ http://localhost:9200 ]
          index: documents
      - kafka:
          addresses: [ localhost:9092 ]
          topic: enriched-docs

After (DSL)

let input = kafka(topic: "documents", group: "enrichment")

let nlp       = input -> http(url: "http://nlp-service/analyze", verb: POST)
let sentiment = input -> http(url: "http://sentiment-service/score", verb: POST)
let entities  = input -> http(url: "http://ner-service/extract", verb: POST)

join(nlp, sentiment, entities)
  -> mapping {
    root = this
    root.nlp       = nlp.result
    root.sentiment = sentiment.result
    root.entities  = entities.result
  }
  -> [
    elasticsearch(index: "documents"),
    kafka(topic: "enriched-docs"),
  ]

Level 6c: Ordered DAG (dependent branches)

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - workflow:
        meta_path: meta.workflow
        order: [ [ geo, device ], [ weather ], [ final ] ]
        branches:
          geo:
            request_map: 'root = this.ip'
            processors:
              - http:
                  url: http://geo-service/lookup
                  verb: POST
            result_map: 'root.geo = this'
          device:
            request_map: 'root = this.user_agent'
            processors:
              - http:
                  url: http://device-service/info
                  verb: POST
            result_map: 'root.device = this'
          weather:
            request_map: 'root = this.geo.location'
            processors:
              - http:
                  url: http://weather-api/current
                  verb: POST
            result_map: 'root.weather = this'

output:
  kafka:
    addresses: [ localhost:9092 ]
    topic: enriched

After (DSL)

let input = kafka(topic: "events")

// Stage 1: geo and device run in parallel
let geo    = input -> http(url: "http://geo-service/lookup")
let device = input -> http(url: "http://device-service/info")

// Stage 2: weather depends on geo (inferred from reference)
let weather = geo -> http(url: "http://weather-api/current")

// Stage 3: join all results
join(geo, device, weather)
  -> mapping {
    root.geo     = geo.result
    root.device  = device.result
    root.weather = weather.result
  }
  -> kafka(topic: "enriched")

The runtime infers execution order from the dependency graph:

  1. geo and device run in parallel
  2. weather waits for geo
  3. The join waits for all three

Level 7a: Reusable resources

Before (YAML)

# resources section (or separate files)
output_resources:
  - label: dead_letter
    kafka:
      addresses: [ localhost:9092 ]
      topic: dead-letters

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - try:
        - http:
            url: http://api.example.com/process
            verb: POST
            timeout: 5s
            retries: 5
            backoff:
              initial_interval: 1s
              max_interval: 30s
        - catch:
            - mapping: |
                root = this
                root.error = error()
            - resource: dead_letter

output:
  kafka:
    addresses: [ localhost:9092 ]
    topic: processed

After (DSL)

resource dead_letter = kafka(topic: "dead-letters")

resource enrich_http(url) = http {
  url: url
  verb: POST
  timeout: "5s"
  retry: { max: 5, backoff: "1s..30s" }
  on_error -> dead_letter
}

kafka(topic: "events")
  -> enrich_http(url: "http://api.example.com/process")
  -> kafka(topic: "processed")

Level 7b: Parameterized sub-pipelines

Before (YAML)

# Pipeline 1 (file: orders.yaml)
input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ orders ]

pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()
    - schema_registry_encode:
        url: http://schema-registry:8081
        subject: orders-v2
    - mapping: |
        root = this
        root.normalized_at = now()

output:
  sql_insert:
    driver: postgres
    dsn: "postgres://localhost/db"
    table: orders
    columns: [ data ]
    args_mapping: 'root = [ this.string() ]'

# Pipeline 2 (file: payments.yaml)
# Nearly identical, just different topic, schema, and table
input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ payments ]

pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()
    - schema_registry_encode:
        url: http://schema-registry:8081
        subject: payments-v1
    - mapping: |
        root = this
        root.normalized_at = now()

output:
  sql_insert:
    driver: postgres
    dsn: "postgres://localhost/db"
    table: payments
    columns: [ data ]
    args_mapping: 'root = [ this.string() ]'

After (DSL)

resource normalize(schema) =
  mapping { root = this.content.parse_json() }
  -> schema_registry(schema: schema, action: "validate")
  -> mapping {
    root = this
    root.normalized_at = now()
  }

kafka(topic: "orders")
  -> normalize(schema: "orders-v2")
  -> postgresql(table: "orders")

kafka(topic: "payments")
  -> normalize(schema: "payments-v1")
  -> postgresql(table: "payments")

Level 8: Pipeline-level configuration

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]
    consumer_group: grp

pipeline:
  threads: 4
  processors:
    - mapping: |
        root = this.content.uppercase()

output:
  aws_s3:
    bucket: output

metrics:
  prometheus:
    prefix: rpcn

tracer:
  open_telemetry_collector:
    grpc:
      address: jaeger:4317

logger:
  level: INFO
  format: json

After (DSL)

config {
  threads: 4
  metrics {
    prometheus(prefix: "rpcn")
  }
  tracing {
    otel(endpoint: "http://jaeger:4317")
  }
  logger {
    level: "info"
    format: "json"
  }
}

kafka(topic: "events", group: "grp")
  -> mapping { root = this.content.uppercase() }
  -> aws_s3(bucket: "output")

Level 9a: Per-step error handling

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - try:
        - mapping: |
            root = this.content.parse_json()
    - catch:
        - mapping: "root = deleted()"
    - try:
        - http:
            url: http://api/process
            verb: POST
            retries: 3
            backoff:
              initial_interval: 1s
              max_interval: 10s

output:
  kafka:
    addresses: [ localhost:9092 ]
    topic: processed

After (DSL)

kafka(topic: "events")
  -> mapping { root = this.content.parse_json() }
     on_error: drop
  -> http(url: "http://api/process", verb: POST)
     on_error: retry(max: 3, backoff: "1s..10s")
  -> kafka(topic: "processed")

Level 9b: Error routing (dead letter per step)

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - try:
        - mapping: |
            root = this.content.parse_json()
    - catch:
        - mapping: |
            root.original = this
            root.error = error()
            root.stage = "parse"
        - resource: parse_error_output
    - try:
        - http:
            url: http://api/process
            verb: POST
    - catch:
        - mapping: |
            root.original = this
            root.error = error()
            root.stage = "api"
        - resource: api_error_output

output:
  kafka:
    addresses: [ localhost:9092 ]
    topic: processed

output_resources:
  - label: parse_error_output
    kafka:
      addresses: [ localhost:9092 ]
      topic: parse-errors
  - label: api_error_output
    kafka:
      addresses: [ localhost:9092 ]
      topic: api-errors

After (DSL)

kafka(topic: "events")
  -> mapping { root = this.content.parse_json() }
     on_error -> kafka(topic: "parse-errors")
  -> http(url: "http://api/process", verb: POST)
     on_error -> kafka(topic: "api-errors")
  -> kafka(topic: "processed")

Level 9c: Catch-all error handler

Before (YAML)

input:
  kafka:
    addresses: [ localhost:9092 ]
    topics: [ events ]

pipeline:
  processors:
    - try:
        - mapping: |
            root = this.content.parse_json()
        - http:
            url: http://api/process
            verb: POST
    - catch:
        - mapping: |
            root.original = this
            root.error = error()
            root.failed_at = now()
        - resource: dead_letter

output:
  kafka:
    addresses: [ localhost:9092 ]
    topic: processed

output_resources:
  - label: dead_letter
    kafka:
      addresses: [ localhost:9092 ]
      topic: dead-letters

After (DSL)

kafka(topic: "events")
  -> mapping { root = this.content.parse_json() }
  -> http(url: "http://api/process", verb: POST)
  -> kafka(topic: "processed")

on_error -> mapping {
    root.original = error_message()
    root.error = error()
    root.failed_at = now()
  }
  -> kafka(topic: "dead-letters")

Visual rendering mapping

Every DSL construct maps directly to a visual element:

DSL construct Visual element
component(...) Rectangle node with type + label
-> Directed edge (arrow)
[a, b, c] Fan-out node (one input, N outputs)
(a | b) Fan-in node (N inputs, one output)
switch { } Diamond decision node with branches
batch(...) Accumulator node (shows policy)
join(...) Scatter-gather join node
let name = ... Named group / subgraph
resource Collapsed reusable subgraph
on_error -> Red dashed edge to error handler
config { } Side panel / metadata (not a node)

Open questions

  1. merge vs join semantics - | is merge (interleaved independent messages), join() is scatter-gather (correlate by original message ID). Both needed. Current proposal uses | for merge, join() for correlation.

  2. Imports - import "./common/retry.rpcn" for cross-file resources?

  3. String interpolation - keep Bloblang's ${! ... } or simplify?

  4. Comments - // line, /* */ block?

  5. File extension - .rpcn? .rpf?

  6. Migration - rpcn convert tool to translate existing YAML?

  7. Multi-pipeline - multiple pipelines in one file via named blocks?

Starlark as Unified Language for RPCN

Starlark is a Python dialect for configuration. starlark-go is the Go interpreter.

The shift

Previous evaluation assumed Starlark replaces only the YAML config layer, with Bloblang remaining as opaque strings. Replacing Bloblang too changes everything. Now Starlark is both:

  1. Config language - defines the pipeline topology (evaluated once at startup)
  2. Runtime language - transforms every message on the hot path (evaluated millions of times)

One language. One mental model. One set of tooling.


Comparison (Custom DSL + Bloblang vs Starlark unified)

The two real options. Custom DSL keeps Bloblang for runtime expressions and adds a new syntax for topology. Starlark replaces everything: topology, transformations, conditions, interpolation.

Concern Custom DSL + Bloblang Starlark unified
Languages to learn 2 (new DSL + Bloblang) 1 (Starlark)
Simple linear pipeline Excellent (-> arrows) Good (compact function calls)
Topology visible at glance Excellent (flow operators) OK (nested calls)
Transformation language Bloblang (existing, proven) Starlark (Python-like)
Conditions Bloblang strings Native lambda expressions
String interpolation ${! json("id") } (existing) lambda msg: msg.id
Testability of transforms Hard (need Bloblang runtime) Easy (call function, assert)
IDE support: topology Must build LSP from scratch starlark-lsp (forkable)
IDE support: transforms None (Bloblang strings) Good (Python tooling)
Hot-path performance Good (compiled Bloblang) Unknown (needs benchmark)
Fan-out syntax [a, b, c] fan_out([a, b, c])
Fan-in syntax (a | b) merge([a, b])
Switch routing switch { expr -> out } switch([case(lambda, out)])
Scatter-gather let + join() (inferred) workflow(branches=[...])
Reusable components resource/import (design) def + load() (built-in)
Dynamic generation Not possible For loops, comprehensions
Batch operations Bloblang sort_by, dedupe Real code (sorted, for)
Visual editor round-trip Possible (simple grammar) Hard (can't marshal back)
Parser effort High (grammar, lexer, AST, error recovery) Zero (starlark-go)
Formatter Must build Buildifier (ready)
Linter Must build Buildifier (ready)
Debugger Must build REPL + DAP possible
Error messages Must build (but tailored) Good (built-in stack traces)
Operator expressiveness Custom (->, |, =>) Fixed (Python operators)
Users already know syntax No (new language) Yes (Python subset)
Module system Must design and build Built-in load()
Migration from current High (new DSL + keep Bloblang) High (rewrite everything)
Two-language problem Yes (DSL + Bloblang) No (one language)
Config-time vs runtime confusion Yes (DSL vs Bloblang scopes) Reduced (lambdas are explicit)
Community familiarity Zero (brand new) High (Python-like)
Spec control Full (you own it) None (Google owns Starlark spec)
MCP for AI authoring Must build everything Buildable (hermetic eval)
Total engineering effort Very high (12-18 months) Medium (3-6 months)

Where Custom DSL wins

Custom DSL wins on visual clarity of topology. The -> operator makes data flow visible at a glance. Starlark can never have this because there is no operator overloading for custom types. For the simplest pipelines, the custom DSL is genuinely more readable.

Custom DSL also wins on visual editor round-trip. A simple, regular grammar can be parsed and regenerated by a visual editor. Starlark functions are opaque to a visual editor and cannot be reliably serialized back to text.

Custom DSL wins on spec control. You own the language. You can add features, change semantics, optimize the runtime. With Starlark, language changes require Google consensus.

Where Starlark wins

Starlark wins on engineering cost. No parser, no lexer, no AST, no error recovery, no formatter, no linter, no LSP, no module system to build. All of these exist. The custom DSL requires building all of them.

Starlark wins on one language. Users learn one syntax for topology, transformations, conditions, and interpolation. The custom DSL still has Bloblang as opaque strings with no IDE support, no testing, no composition.

Starlark wins on dynamic generation. List comprehensions and loops let users generate pipelines programmatically. 5 table pipelines from 14 lines. Neither YAML nor a custom DSL can do this without bolting on iteration primitives.

Starlark wins on testability. assert enrich(test_msg) == expected works directly. Bloblang mappings require the full pipeline runtime to test.

Starlark wins on community familiarity. Users bring Python knowledge. The custom DSL is yet another language to learn from scratch.

The fundamental tradeoff

Custom DSL + Bloblang: better syntax, much more engineering work, still has the two-language problem.

Starlark unified: worse topology syntax, dramatically less engineering work, solves the two-language problem.


Existing tooling ecosystem

Starlark has a mature tooling ecosystem driven by the Bazel community. Most tools are reusable for non-Bazel Starlark with minimal effort.

Interpreter and Go libraries

  • starlark-go - the canonical Go interpreter. Native Go, no CGo. Includes REPL (cmd/starlark). Custom types via starlark.Value interface. Bytecode compilation. Thread-safe parallel execution.
  • Starlet - Go wrapper that simplifies script execution and data conversion. Includes builtin modules for HTTP, JSON, CSV, regex, etc.
  • Starlib - Starlark standard library extensions: regex, XLSX, ZIP, HTTP, etc.
  • Starstruct - bidirectional conversion between starlark.StringDict and Go structs. Uses struct tags. Directly useful for RPCN config struct marshaling.
  • Startype - two-way conversion between Starlark API types and Go types. Handles kwargs-to-struct mapping for builtin functions.
  • Starlight - wrapper around starlark-go that auto-converts Go types (structs, maps, slices, functions) to Starlark values via reflection.

Formatter and linter

  • Buildifier - the canonical Starlark formatter and linter. Written in Go. ~100 lint checks (many Bazel-specific, but formatting works on any Starlark). Supports --type=default for non-Bazel .star files. Auto-fix mode. JSON diagnostic output. Directly usable for RPCN with zero changes.

Language servers (LSP)

  • starlark-lsp - the only general-purpose Starlark LSP. Written in Go. Uses Tree-sitter for parsing. Supports completion, signature help, hover, diagnostics. Designed to accept type-stub files describing custom builtins. This is the one to fork for RPCN: feed it stubs for kafka(), s3(), pipeline(), etc. and get IDE support for RPCN builtins.
  • Starpls - full LSP for Starlark, written in Rust. Type checking, goto definition, diagnostics. 169 stars. Active development. Bazel/Buck2 focused but has general Starlark support.
  • Bazel Stack BSV LSP - commercial Starlark LSP tightly coupled to Bazel. Not reusable.

VS Code extensions

  • vscode-starlark - wraps a modified Python language server for jump-to-definition, linting, completion on .star files. Works because Starlark syntax is a Python subset.
  • Tilt VS Code extension - bundles starlark-lsp. Completion for Tilt builtins, load() resolution, runtime error underlining. Good reference for building a domain-specific Starlark extension.
  • Kurtosis VS Code extension - another domain-specific Starlark extension. Same pattern: custom builtins registered with the LSP.
  • Bazel VS Code extension - TextMate grammar for syntax highlighting. The grammar is reusable for any Starlark variant.

Debugger

  • Bazel Starlark debugger - DAP (Debug Adapter Protocol) based, works with VS Code. Tightly coupled to Bazel's execution model. Not directly reusable, but the DAP-over-protobuf approach could be replicated for RPCN's runtime.

MCP

No Starlark-specific MCP server exists. For RPCN, building one that exposes validate_pipeline, list_components, explain_component tools would be straightforward. The starlark-go interpreter is hermetic and safe to run in an MCP sandbox.

What this means for RPCN

Tool Effort Approach
Formatter Zero Use buildifier --type=default as-is
Syntax highlighting Near-zero Reuse Bazel TextMate grammar
LSP Medium Fork starlark-lsp, add RPCN builtin stubs
VS Code extension Small Thin wrapper around LSP (follow Tilt pattern)
Debugger Medium DAP adapter around starlark-go interpreter
Go struct conversion Small Use Starstruct/Startype for config structs
MCP server Medium Expose validate/list/explain as tools

Curated link list


What Bloblang does today (that Starlark must replace)

Bloblang serves four roles:

1. Message transformation (mapping)

root.body = this.content.uppercase()
root.timestamp = now()
root.word_count = this.content.split(" ").length()

2. Conditions (switch routing, filters)

this.type == "order"
this.age > 18 && this.country == "US"

3. String interpolation in config

${! json("id") }
${! meta("kafka_topic") }
${! count("msgs") }

4. Batch-level operations

root = batch_contents().sort_by(v -> v.timestamp)

Starlark equivalents for each Bloblang role

1. Message transformation

Bloblang:

root.body = this.content.uppercase()
root.timestamp = now()
root.word_count = this.content.split(" ").length()

Starlark (message is a dict-like msg value):

def process(msg):
    msg["body"] = msg["content"].upper()
    msg["timestamp"] = now()
    msg["word_count"] = len(msg["content"].split(" "))
    return msg

Starlark (message has attribute access via custom Value type):

def process(msg):
    msg.body = msg.content.upper()
    msg.timestamp = now()
    msg.word_count = len(msg.content.split(" "))
    return msg

The second form requires implementing HasSetField on the message type. More ergonomic. Worth the Go-side effort.

2. Conditions

Bloblang:

this.type == "order"
this.age > 18 && this.country == "US"

Starlark (as lambda or named function):

lambda msg: msg.type == "order"
lambda msg: msg.age > 18 and msg.country == "US"

Or as standalone expressions in the config:

switch([
    case(lambda msg: msg.type == "order",   kafka(topic="orders")),
    case(lambda msg: msg.type == "click",   kafka(topic="clicks")),
    case(default=True,                      kafka(topic="other")),
])

3. String interpolation

Bloblang:

${! json("id") }
${! meta("kafka_topic") }

Starlark (path template as function):

output = s3(
    bucket="archive",
    path=lambda msg: "%s/%s.json" % (msg.meta("kafka_topic"), msg.id),
)

Or with a helper:

output = s3(
    bucket="archive",
    path=template("{meta.kafka_topic}/{id}.json"),
)

4. Batch operations

Bloblang:

root = batch_contents().sort_by(v -> v.timestamp)

Starlark:

def sort_batch(batch):
    return sorted(batch, key=lambda msg: msg.timestamp)

Revised examples (before and after)

Level 1: Simple linear pipeline

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
    consumer_group: grp
pipeline:
  processors:
    - mapping: |
        root.body = this.content.uppercase()
output:
  aws_s3:
    bucket: archive
    path: '${! json("id") }.json'

Starlark:

pipeline(
    input = kafka(topic="events", group="grp"),
    process = lambda msg: msg.set(body=msg.content.upper()),
    output = s3(bucket="archive", path=lambda msg: msg.id + ".json"),
)

Level 1b: Multi-line processor

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
    consumer_group: grp
pipeline:
  processors:
    - mapping: |
        root = this
        root.timestamp = now()
        root.body = this.content.uppercase()
        root.word_count = this.content.split(" ").length()
output:
  aws_s3:
    bucket: archive

Starlark:

def enrich(msg):
    msg.timestamp = now()
    msg.body = msg.content.upper()
    msg.word_count = len(msg.content.split(" "))
    return msg

pipeline(
    input = kafka(topic="events", group="grp"),
    process = enrich,
    output = s3(bucket="archive"),
)

Level 1c: Multiple processors in sequence

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()
    - jmespath:
        query: "users[?active]"
    - mapping: |
        root.processed_at = now()
output:
  kafka:
    addresses: [localhost:9092]
    topic: processed

Starlark:

pipeline(
    input = kafka(topic="events"),
    processors = [
        lambda msg: msg.content.parse_json(),
        jmespath(query="users[?active]"),
        lambda msg: msg.set(processed_at=now()),
    ],
    output = kafka(topic="processed"),
)

Level 2: Component with many parameters

Current YAML + Bloblang:

input:
  kafka:
    addresses: [broker1:9092, broker2:9092]
    topics: [events]
    consumer_group: processors
    tls:
      enabled: true
      cert_file: /certs/client.crt
      key_file: /certs/client.key
pipeline:
  processors:
    - mapping: "root = this"
output:
  aws_s3:
    bucket: output
    path: '${! meta("kafka_topic") }/${! json("id") }.json'
    content_type: application/json
    max_in_flight: 64

Starlark:

pipeline(
    input = kafka(
        addresses=["broker1:9092", "broker2:9092"],
        topic="events",
        group="processors",
        tls=tls(cert="/certs/client.crt", key="/certs/client.key"),
    ),
    output = s3(
        bucket="output",
        path=lambda msg: "%s/%s.json" % (msg.meta("kafka_topic"), msg.id),
        content_type="application/json",
        max_in_flight=64,
    ),
)

Level 3a: Broadcast fan-out

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
pipeline:
  processors:
    - mapping: "root = this.content.parse_json()"
output:
  broker:
    pattern: fan_out
    outputs:
      - aws_s3:
          bucket: archive
      - kafka:
          addresses: [localhost:9092]
          topic: processed
      - http_client:
          url: http://webhook.example.com/notify
          verb: POST

Starlark:

pipeline(
    input = kafka(topic="events"),
    process = lambda msg: msg.content.parse_json(),
    output = fan_out([
        s3(bucket="archive"),
        kafka(topic="processed"),
        http(url="http://webhook.example.com/notify", verb="POST"),
    ]),
)

Level 3b: Conditional routing (switch)

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
pipeline:
  processors:
    - mapping: "root = this.content.parse_json()"
output:
  switch:
    cases:
      - check: this.type == "order"
        output:
          kafka:
            addresses: [localhost:9092]
            topic: orders
      - check: this.type == "click"
        output:
          kafka:
            addresses: [localhost:9092]
            topic: clicks
      - output:
          kafka:
            addresses: [localhost:9092]
            topic: other

Starlark:

pipeline(
    input = kafka(topic="events"),
    process = lambda msg: msg.content.parse_json(),
    output = switch([
        case(lambda msg: msg.type == "order", kafka(topic="orders")),
        case(lambda msg: msg.type == "click", kafka(topic="clicks")),
        case(default=True,                    kafka(topic="other")),
    ]),
)

Level 3c: Non-exclusive routing (continue)

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
output:
  switch:
    cases:
      - check: this.priority == "high"
        continue: true
        output:
          http_client:
            url: http://pagerduty.com/alert
            verb: POST
      - check: this.region == "eu"
        continue: true
        output:
          kafka:
            addresses: [localhost:9092]
            topic: eu-events
      - check: this.region == "us"
        continue: true
        output:
          kafka:
            addresses: [localhost:9092]
            topic: us-events
      - output:
          kafka:
            addresses: [localhost:9092]
            topic: all-events

Starlark:

pipeline(
    input = kafka(topic="events"),
    output = switch(continue_matching=True, cases=[
        case(lambda msg: msg.priority == "high", http(url="http://pagerduty.com/alert", verb="POST")),
        case(lambda msg: msg.region == "eu",     kafka(topic="eu-events")),
        case(lambda msg: msg.region == "us",     kafka(topic="us-events")),
        case(default=True,                       kafka(topic="all-events")),
    ]),
)

Level 3d: Round-robin

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
output:
  broker:
    pattern: round_robin
    outputs:
      - http_client:
          url: http://worker-1:8080/process
          verb: POST
      - http_client:
          url: http://worker-2:8080/process
          verb: POST
      - http_client:
          url: http://worker-3:8080/process
          verb: POST

Starlark:

pipeline(
    input = kafka(topic="events"),
    output = round_robin([
        http(url="http://worker-%d:8080/process" % i, verb="POST")
        for i in range(1, 4)
    ]),
)

Level 3e: Fallback chain (try)

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
output:
  broker:
    pattern: try
    outputs:
      - aws_s3:
          bucket: primary-bucket
      - aws_s3:
          bucket: fallback-bucket
          region: us-west-2
      - file:
          path: '/var/spool/dead-letters/${! count("msgs") }.json'

Starlark:

pipeline(
    input = kafka(topic="events"),
    output = try_outputs([
        s3(bucket="primary-bucket"),
        s3(bucket="fallback-bucket", region="us-west-2"),
        file(path=lambda msg: "/var/spool/dead-letters/%d.json" % count("msgs")),
    ]),
)

Level 4: Fan-in (merge inputs)

Current YAML + Bloblang:

input:
  broker:
    inputs:
      - kafka:
          addresses: [localhost:9092]
          topics: [orders]
          consumer_group: pipeline
      - kafka:
          addresses: [localhost:9092]
          topics: [returns]
          consumer_group: pipeline
pipeline:
  processors:
    - mapping: |
        root = this
        root.source = meta("kafka_topic")
output:
  sql_insert:
    driver: postgres
    dsn: postgres://localhost/warehouse
    table: events
    columns: [source, payload]
    args_mapping: 'root = [this.source, this.string()]'

Starlark:

def tag_source(msg):
    msg.source = msg.meta("kafka_topic")
    return msg

pipeline(
    input = merge([
        kafka(topic="orders", group="pipeline"),
        kafka(topic="returns", group="pipeline"),
    ]),
    process = tag_source,
    output = postgresql(dsn="postgres://localhost/warehouse", table="events"),
)

Level 5a: Simple batch

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
output:
  aws_s3:
    bucket: output
    batching:
      count: 100
      byte_size: 5000000
      period: 30s
      processors:
        - mapping: |
            root.items = batch_contents()
            root.count = batch_size()
            root.flushed_at = now()

Starlark:

def wrap_batch(batch):
    return {
        "items": batch,
        "count": len(batch),
        "flushed_at": now(),
    }

pipeline(
    input = kafka(topic="events"),
    processors = [
        batch(count=100, size="5MB", period="30s", process=wrap_batch),
    ],
    output = s3(bucket="output"),
)

Level 5b: Batch with sort and dedup

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
output:
  aws_s3:
    bucket: output
    batching:
      count: 100
      processors:
        - mapping: |
            root = batch_contents().sort_by(v -> v.timestamp)
        - dedupe:
            key: '${! json("id") }'

Starlark:

def sort_and_dedup(batch):
    seen = {}
    result = []
    for msg in sorted(batch, key=lambda m: m.timestamp):
        if msg.id not in seen:
            seen[msg.id] = True
            result.append(msg)
    return result

pipeline(
    input = kafka(topic="events"),
    processors = [
        batch(count=100, process=sort_and_dedup),
    ],
    output = s3(bucket="output"),
)

Note: the dedup is now real code with real logic, not a magic string. Users can write any dedup strategy they want. This is strictly more powerful than the YAML version.

Level 6b: Scatter-gather enrichment

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [documents]
    consumer_group: enrichment
pipeline:
  processors:
    - workflow:
        meta_path: meta.workflow
        order: [[nlp, sentiment, entities]]
        branches:
          nlp:
            request_map: 'root = this.content'
            processors:
              - http:
                  url: http://nlp-service/analyze
                  verb: POST
            result_map: 'root.nlp = this'
          sentiment:
            request_map: 'root = this.content'
            processors:
              - http:
                  url: http://sentiment-service/score
                  verb: POST
            result_map: 'root.sentiment = this'
          entities:
            request_map: 'root = this.content'
            processors:
              - http:
                  url: http://ner-service/extract
                  verb: POST
            result_map: 'root.entities = this'
output:
  broker:
    pattern: fan_out
    outputs:
      - elasticsearch:
          urls: [http://localhost:9200]
          index: documents
      - kafka:
          addresses: [localhost:9092]
          topic: enriched-docs

Starlark:

def enrich_branch(name, url):
    return branch(
        name = name,
        request = lambda msg: msg.content,
        process = http(url=url, verb="POST"),
        result = lambda msg, result: msg.set(**{name: result}),
    )

pipeline(
    input = kafka(topic="documents", group="enrichment"),
    processors = [
        workflow(branches=[
            enrich_branch("nlp",       "http://nlp-service/analyze"),
            enrich_branch("sentiment", "http://sentiment-service/score"),
            enrich_branch("entities",  "http://ner-service/extract"),
        ]),
    ],
    output = fan_out([
        elasticsearch(index="documents"),
        kafka(topic="enriched-docs"),
    ]),
)

Level 6c: Ordered DAG (dependent branches)

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
pipeline:
  processors:
    - workflow:
        meta_path: meta.workflow
        order: [[geo, device], [weather]]
        branches:
          geo:
            request_map: 'root = this.ip'
            processors:
              - http:
                  url: http://geo-service/lookup
                  verb: POST
            result_map: 'root.geo = this'
          device:
            request_map: 'root = this.user_agent'
            processors:
              - http:
                  url: http://device-service/info
                  verb: POST
            result_map: 'root.device = this'
          weather:
            request_map: 'root = this.geo.location'
            processors:
              - http:
                  url: http://weather-api/current
                  verb: POST
            result_map: 'root.weather = this'
output:
  kafka:
    addresses: [localhost:9092]
    topic: enriched

Starlark:

pipeline(
    input = kafka(topic="events"),
    processors = [
        workflow(
            order=[["geo", "device"], ["weather"]],
            branches=[
                branch("geo",
                    request=lambda msg: msg.ip,
                    process=http(url="http://geo-service/lookup", verb="POST"),
                    result=lambda msg, r: msg.set(geo=r)),
                branch("device",
                    request=lambda msg: msg.user_agent,
                    process=http(url="http://device-service/info", verb="POST"),
                    result=lambda msg, r: msg.set(device=r)),
                branch("weather",
                    request=lambda msg: msg.geo.location,
                    process=http(url="http://weather-api/current", verb="POST"),
                    result=lambda msg, r: msg.set(weather=r)),
            ],
        ),
    ],
    output = kafka(topic="enriched"),
)

Level 7: Reusable resources (parameterized sub-pipelines)

Current YAML + Bloblang (two nearly identical files):

# orders.yaml
input:
  kafka:
    addresses: [localhost:9092]
    topics: [orders]
pipeline:
  processors:
    - mapping: |
        root = this.content.parse_json()
    - schema_registry_encode:
        url: http://schema-registry:8081
        subject: orders-v2
    - mapping: |
        root = this
        root.normalized_at = now()
output:
  sql_insert:
    driver: postgres
    dsn: postgres://localhost/db
    table: orders
    columns: [data]
    args_mapping: 'root = [this.string()]'

# payments.yaml (nearly identical, different topic/schema/table)

Starlark:

# common.star
def normalize(schema):
    """Parse JSON, validate schema, add timestamp."""
    def process(msg):
        data = msg.content.parse_json()
        data.normalized_at = now()
        return data
    return [
        lambda msg: msg.content.parse_json(),
        schema_registry(schema=schema, action="validate"),
        lambda msg: msg.set(normalized_at=now()),
    ]
# orders.star
load("common.star", "normalize")

pipeline(
    input = kafka(topic="orders"),
    processors = normalize(schema="orders-v2"),
    output = postgresql(table="orders"),
)
# payments.star
load("common.star", "normalize")

pipeline(
    input = kafka(topic="payments"),
    processors = normalize(schema="payments-v1"),
    output = postgresql(table="payments"),
)

Level 8: Pipeline-level configuration

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
    consumer_group: grp
pipeline:
  threads: 4
  processors:
    - mapping: "root = this.content.uppercase()"
output:
  aws_s3:
    bucket: output
metrics:
  prometheus:
    prefix: rpcn
tracer:
  open_telemetry_collector:
    grpc:
      address: jaeger:4317
logger:
  level: INFO
  format: json

Starlark:

config(
    threads=4,
    metrics=prometheus(prefix="rpcn"),
    tracing=otel(endpoint="http://jaeger:4317"),
    logger=logger(level="info", format="json"),
)

pipeline(
    input = kafka(topic="events", group="grp"),
    process = lambda msg: msg.content.upper(),
    output = s3(bucket="output"),
)

Level 9a: Per-step error handling

Current YAML + Bloblang:

input:
  kafka:
    addresses: [localhost:9092]
    topics: [events]
pipeline:
  processors:
    - try:
        - mapping: "root = this.content.parse_json()"
    - catch:
        - mapping: "root = deleted()"
    - try:
        - http:
            url: http://api/process
            verb: POST
            retries: 3
            backoff:
              initial_interval: 1s
              max_interval: 10s
output:
  kafka:
    addresses: [localhost:9092]
    topic: processed

Starlark:

pipeline(
    input = kafka(topic="events"),
    processors = [
        step(lambda msg: msg.content.parse_json(), on_error="drop"),
        step(http(url="http://api/process", verb="POST"),
             on_error=retry(max=3, backoff="1s..10s")),
    ],
    output = kafka(topic="processed"),
)

Level 9b: Error routing (dead letter per step)

Current YAML + Bloblang:

# 30+ lines with try/catch blocks, resource definitions, etc.

Starlark:

dead_letter = kafka(topic="dead-letters")

pipeline(
    input = kafka(topic="events"),
    processors = [
        step(lambda msg: msg.content.parse_json(),
             on_error=route_to(kafka(topic="parse-errors"))),
        step(http(url="http://api/process", verb="POST"),
             on_error=route_to(kafka(topic="api-errors"))),
    ],
    output = kafka(topic="processed"),
    on_error=route_to(dead_letter),
)

Dynamic generation (Starlark-only capability)

Current YAML + Bloblang: Not possible in a single config.

Starlark:

TABLES = ["orders", "payments", "users", "products", "inventory"]

def normalize(table):
    return [
        lambda msg: msg.content.parse_json(),
        schema_registry(schema=table + "-v2"),
        lambda msg: msg.set(normalized_at=now()),
    ]

[pipeline(
    input = kafka(topic=t),
    processors = normalize(t),
    output = postgresql(table=t),
) for t in TABLES]

Revised pros (with Bloblang replacement)

1. ONE language. The biggest win. No Bloblang strings inside YAML. No context switching. One syntax highlighter, one linter, one debugger, one set of docs.

2. Real code for transformations. Dedup is a for loop, not a magic processor. Sort is sorted(), not sort_by() with a custom lambda syntax. Filter is if, not a Bloblang check string. Users bring existing Python knowledge to message processing.

3. Testable. Processing functions can be unit tested directly in Starlark. Call enrich(test_msg) and assert the result. Today Bloblang mappings are untestable without running the full pipeline.

4. IDE support for free. Every Python-aware IDE gives syntax highlighting, autocomplete, and error detection for Starlark files. Bloblang has nearly zero IDE support.

5. msg.set() composes. Starlark functions return values. They compose naturally. normalize(schema) returns a list of processors. No special resource syntax needed.

6. Conditions are real expressions. lambda msg: msg.type == "order" is evaluated by the same runtime as everything else. No separate condition parser.

7. String interpolation is dead. ${! json("id") } becomes lambda msg: msg.id. Type-safe. Autocomplete-able. No special parsing.


Revised cons (with Bloblang replacement)

1. Performance on the hot path. This is the critical concern. Starlark is an interpreter. Every message invokes starlark.Call(). Bloblang was purpose-built for message transformation and likely compiles to an efficient internal representation.

Mitigations:

  • Pre-compile Starlark functions to bytecode (starlark-go supports this)
  • Implement hot-path builtins in Go (json parse, uppercase, split, etc.) so Starlark dispatches to Go, not interpreted code
  • The message Value type can be a thin wrapper over the Go struct, avoiding copying data into Starlark heap
  • Benchmark. The overhead may be acceptable if builtins do the heavy lifting.

2. Message data model friction. Starlark dicts are not JSON objects. Need a custom Message type implementing starlark.Value, HasAttrs, HasSetField, Iterable. This is significant Go-side work but only done once.

Key design decisions:

  • Does msg.foo copy or reference?
  • Nested access: msg.geo.location requires nested Value types
  • Type coercion: Starlark int vs JSON number vs Go int64

3. Lambda limitations in Starlark. Starlark lambdas are single expressions only. Multi-line transforms require def. This makes inline processors verbose:

# Can't do this:
process = lambda msg:
    msg.body = msg.content.upper()
    msg.timestamp = now()
    return msg

# Must do this:
def process(msg):
    msg.body = msg.content.upper()
    msg.timestamp = now()
    return msg

This is fine for named functions but awkward for inline one-offs.

4. No method chaining on messages. Bloblang's this.content.uppercase() chains naturally. In Starlark, string methods exist ("foo".upper()) but the message needs custom methods registered via Go. Getting msg.content.upper() to work requires msg.content to return a Starlark string, which it would if implemented properly.

Actually: this works fine. msg.content returns starlark.String, which has .upper(), .split(), etc. The method chaining concern is mostly unfounded for string operations.

5. Error handling in lambdas. Starlark has no try/catch. Errors propagate as Go errors from starlark.Call(). The host must wrap every function invocation with error handling. Per-message error routing requires the Go runtime to catch errors and apply routing logic.

This is actually fine architecturally: error handling belongs in the runtime, not in user code. The user says "on_error: drop" and the Go runtime handles it. But users cannot write custom error recovery logic in Starlark.

6. Debugging story. When a transform fails at message 1,000,042, how does the user debug it? Starlark gives stack traces with line numbers, which is better than Bloblang's error messages. But there is no step debugger for Starlark (there is a REPL though).

7. Migration cost. Every existing Bloblang mapping must be rewritten. This is a breaking change. A Bloblang-to-Starlark transpiler could help but won't be perfect.

8. Visual editor round-trip. Still hard. Starlark functions are opaque to a visual editor. The editor can render the topology (pipeline/fan_out/ switch structure) but cannot meaningfully render or edit lambda bodies. Same problem as before, but at least there is only one language to deal with.


The critical question: performance

Starlark-go benchmarks suggest overhead of roughly 100-500ns per function call for simple operations, plus the cost of value conversion.

For a pipeline doing:

  • Parse JSON: dominated by JSON parsing time (Go builtin), not Starlark
  • String operations: Go string methods, wrapped in Starlark Value interface
  • Field access: Go struct field lookup via HasAttrs
  • Condition check: single comparison, ~100ns

If the Go builtins do the heavy lifting, Starlark overhead per message should be in the low microseconds. At 100K msgs/sec, that is 100ms/sec of Starlark overhead. Probably acceptable.

At 1M msgs/sec with complex transforms, it may not be acceptable. Benchmarking is required. The escape hatch is always: implement the hot processor in Go and expose it as a builtin.


Recommendation

Replacing Bloblang with Starlark is a strong move if:

  1. You benchmark hot-path performance and it is acceptable
  2. You invest in a good Message Value type with attribute access
  3. You provide rich Go builtins for common operations (json parse, string manipulation, time functions, etc.)

The single-language story is compelling. Bloblang is a niche DSL that users must learn from scratch. Starlark is Python they already know. The same function that transforms a message can be unit tested, shared via load(), and composed with standard language features.

The main risk is performance. Prototype the Message type, implement the core builtins, and benchmark against Bloblang on real workloads before committing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment