Solid design. Clean Publisher interface, good best-effort semantics (publish failures never block storage), and thorough test layers (unit, integration, E2E). Two items worth addressing before merge.
| Severity | Issue | Location |
|---|---|---|
| Critical | E2E reader pinned to Partition: 0 — will hang against multi-partition topics. Fix: use a consumer group or assert single-partition at setup. |
proxy_kafka_e2e_test.go:117 |
| Important | skafka.Transport not closed on Publisher.Close() — connection pool leak for long-running proxies. Fix: track transport ownership and close it explicitly. |
kafka/kafka.go:66 |
| Minor | Redundant topic validation in newPublisherWithWriter vs NewPublisher — dead code for all public callers. |
kafka/kafka.go:76 |
| Minor | deriveRootHash does full ancestry walk per turn — fine at current scale, worth a comment documenting the ordering contract. |
worker/pool.go:211 |
The Kafka publisher uses &skafka.Hash{} balancer (hashing on root hash as message key). The E2E test consumer hardcodes Partition: 0. The Dagger harness creates the topic with --partitions 1 so CI passes, but against a pre-existing multi-partition topic the test will hang waiting for messages routed to a different partition.
// Current (fragile)
reader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: brokers,
Topic: topic,
Partition: 0,
})
// Suggested: use a consumer group to read all partitions
reader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: brokers,
GroupID: fmt.Sprintf("tapes-e2e-%d", time.Now().UnixNano()),
Topic: topic,
})kafka-go's Writer.Close() does not close the Transport. When ClientID is set, a custom skafka.Transport is created with its own connection pool. This leaks when the publisher is closed.
// Suggested: track and close the transport
type Publisher struct {
writer writer
transport io.Closer // non-nil when we own the transport
publishTimeout time.Duration
}
func (p *Publisher) Close() error {
err := p.writer.Close()
if p.transport != nil {
if cerr := p.transport.Close(); cerr != nil && err == nil {
err = cerr
}
}
return err
}NewPublisher validates c.Topic before calling newPublisherWithWriter, which checks it again. The inner check is dead code for all public callers. Either remove it from the inner function or consolidate all validation there.
deriveRootHash calls Driver.Ancestry(ctx, head) and takes ancestry[len(ancestry)-1] — correct per the contract ([head, ..., root]). This is O(n) in conversation length per turn. Acceptable at current scale but worth a comment explaining the ordering.
Publisherinterface is minimal and clean (Publish+Close)NopPublisherdefault means zero config change for existing users- Best-effort publish: storage is the authority, publish errors are logged but never block persistence
- Content-addressed dedup guard: only newly inserted nodes are published
pubOwnedlifecycle management correctly handles cleanup onproxy.Newfailure- Root hash as partition key gives stable per-conversation ordering in Kafka
- E2E test with Dagger + Confluent container is well-structured
- Good test coverage across all layers
# 1. Kafka broker
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
confluentinc/confluent-local:latest
# 2. Create topic
docker exec kafka kafka-topics --create \
--topic tapes-events --partitions 1 \
--bootstrap-server localhost:9092
# 3. Run proxy with Kafka publisher
make build
./build/tapes serve proxy \
--publisher kafka \
--kafka-brokers localhost:9092 \
--kafka-topic tapes-events \
--port 8080
# 4. Send traffic
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{"model":"gpt-4o-mini","messages":[{"role":"user","content":"hello"}]}'
# 5. Verify events
docker exec kafka kafka-console-consumer \
--topic tapes-events --from-beginning \
--bootstrap-server localhost:9092Or via Dagger: make test-kafka-e2e