Skip to content

Instantly share code, notes, and snippets.

@pbedat
Created April 14, 2025 19:03
Show Gist options
  • Select an option

  • Save pbedat/8ddeb5974534461e58c91284a75476c4 to your computer and use it in GitHub Desktop.

Select an option

Save pbedat/8ddeb5974534461e58c91284a75476c4 to your computer and use it in GitHub Desktop.
watermill-sql benchmark
package main
import (
"context"
std_sql "database/sql"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/dkotik/watermillsqlite/wmsqlitemodernc"
_ "modernc.org/sqlite"
)
func main() {
logger := watermill.NewStdLogger(false, false)
db, err := std_sql.Open("sqlite", "file:test.db?_journal=wal&_sync=normal&_busy_timeout=1000")
//db, err := std_sql.Open("sqlite", ":memory:")
if err != nil {
panic(err)
}
db.SetMaxOpenConns(1)
sub1, err := wmsqlitemodernc.NewSubscriber(db, wmsqlitemodernc.SubscriberOptions{
ConsumerGroup: "consumer_group_1",
InitializeSchema: true,
Logger: logger,
PollInterval: time.Millisecond,
})
if err != nil {
panic(err)
}
sub2, err := wmsqlitemodernc.NewSubscriber(db, wmsqlitemodernc.SubscriberOptions{
ConsumerGroup: "consumer_group_2",
InitializeSchema: true,
Logger: logger,
PollInterval: time.Millisecond,
})
if err != nil {
panic(err)
}
var pub message.Publisher
pub, err = wmsqlitemodernc.NewPublisher(db, wmsqlitemodernc.PublisherOptions{
InitializeSchema: true,
Logger: logger,
})
if err != nil {
panic(err)
}
sampleSize := 1_000
go func() {
for i := range sampleSize {
go func() {
err := pub.Publish(
fmt.Sprint("topic_", i%2),
message.NewMessage(fmt.Sprint(i), message.Payload(fmt.Sprint("payload_", i))))
log.Print("inserted ", i)
if err != nil {
panic(err)
}
}()
}
}()
ctx := context.Background()
var wg sync.WaitGroup
wg.Add(sampleSize * 2)
subscribers := []message.Subscriber{sub1, sub2}
var i atomic.Int32
now := time.Now()
for s, sub := range subscribers {
go func() {
msgs1, err := sub.Subscribe(ctx, "topic_0")
if err != nil {
panic(err)
}
msgs2, err := sub.Subscribe(ctx, "topic_1")
if err != nil {
panic(err)
}
go func() {
for msg := range msgs1 {
log.Print("sub", s, " topic_0 ", i.Add(1))
msg.Ack()
wg.Done()
}
}()
go func() {
for msg := range msgs2 {
log.Print("sub", s, " topic_1 ", i.Add(1))
msg.Ack()
wg.Done()
}
}()
}()
}
wg.Wait()
log.Print(time.Since(now))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment