Skip to content

Instantly share code, notes, and snippets.

@bdougie
Created March 3, 2026 13:20
Show Gist options
  • Select an option

  • Save bdougie/361a2a6a25bb4517ca97b9cbabce9c1b to your computer and use it in GitHub Desktop.

Select an option

Save bdougie/361a2a6a25bb4517ca97b9cbabce9c1b to your computer and use it in GitHub Desktop.

PR #131 Review: feat: publish root-keyed DAG events to Kafka

Overall

Solid design. Clean Publisher interface, good best-effort semantics (publish failures never block storage), and thorough test layers (unit, integration, E2E). Two items worth addressing before merge.

Issues

Severity Issue Location
Critical E2E reader pinned to Partition: 0 — will hang against multi-partition topics. Fix: use a consumer group or assert single-partition at setup. proxy_kafka_e2e_test.go:117
Important skafka.Transport not closed on Publisher.Close() — connection pool leak for long-running proxies. Fix: track transport ownership and close it explicitly. kafka/kafka.go:66
Minor Redundant topic validation in newPublisherWithWriter vs NewPublisher — dead code for all public callers. kafka/kafka.go:76
Minor deriveRootHash does full ancestry walk per turn — fine at current scale, worth a comment documenting the ordering contract. worker/pool.go:211

Details

1. E2E Kafka reader pinned to partition 0 (Critical)

The Kafka publisher uses &skafka.Hash{} balancer (hashing on root hash as message key). The E2E test consumer hardcodes Partition: 0. The Dagger harness creates the topic with --partitions 1 so CI passes, but against a pre-existing multi-partition topic the test will hang waiting for messages routed to a different partition.

// Current (fragile)
reader := kafkago.NewReader(kafkago.ReaderConfig{
    Brokers:   brokers,
    Topic:     topic,
    Partition: 0,
})

// Suggested: use a consumer group to read all partitions
reader := kafkago.NewReader(kafkago.ReaderConfig{
    Brokers:  brokers,
    GroupID:  fmt.Sprintf("tapes-e2e-%d", time.Now().UnixNano()),
    Topic:    topic,
})

2. Transport not closed (Important)

kafka-go's Writer.Close() does not close the Transport. When ClientID is set, a custom skafka.Transport is created with its own connection pool. This leaks when the publisher is closed.

// Suggested: track and close the transport
type Publisher struct {
    writer         writer
    transport      io.Closer // non-nil when we own the transport
    publishTimeout time.Duration
}

func (p *Publisher) Close() error {
    err := p.writer.Close()
    if p.transport != nil {
        if cerr := p.transport.Close(); cerr != nil && err == nil {
            err = cerr
        }
    }
    return err
}

3. Redundant validation (Minor)

NewPublisher validates c.Topic before calling newPublisherWithWriter, which checks it again. The inner check is dead code for all public callers. Either remove it from the inner function or consolidate all validation there.

4. Ancestry walk per turn (Minor)

deriveRootHash calls Driver.Ancestry(ctx, head) and takes ancestry[len(ancestry)-1] — correct per the contract ([head, ..., root]). This is O(n) in conversation length per turn. Acceptable at current scale but worth a comment explaining the ordering.

What's good

  • Publisher interface is minimal and clean (Publish + Close)
  • NopPublisher default means zero config change for existing users
  • Best-effort publish: storage is the authority, publish errors are logged but never block persistence
  • Content-addressed dedup guard: only newly inserted nodes are published
  • pubOwned lifecycle management correctly handles cleanup on proxy.New failure
  • Root hash as partition key gives stable per-conversation ordering in Kafka
  • E2E test with Dagger + Confluent container is well-structured
  • Good test coverage across all layers

Local testing guide

# 1. Kafka broker
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  confluentinc/confluent-local:latest

# 2. Create topic
docker exec kafka kafka-topics --create \
  --topic tapes-events --partitions 1 \
  --bootstrap-server localhost:9092

# 3. Run proxy with Kafka publisher
make build
./build/tapes serve proxy \
  --publisher kafka \
  --kafka-brokers localhost:9092 \
  --kafka-topic tapes-events \
  --port 8080

# 4. Send traffic
curl http://localhost:8080/v1/chat/completions \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $OPENAI_API_KEY" \
  -d '{"model":"gpt-4o-mini","messages":[{"role":"user","content":"hello"}]}'

# 5. Verify events
docker exec kafka kafka-console-consumer \
  --topic tapes-events --from-beginning \
  --bootstrap-server localhost:9092

Or via Dagger: make test-kafka-e2e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment