Skip to content

Instantly share code, notes, and snippets.

@mmorton
Last active August 28, 2025 23:53
Show Gist options
  • Select an option

  • Save mmorton/29e28a5e44e7128dbf504ed8a73569c1 to your computer and use it in GitHub Desktop.

Select an option

Save mmorton/29e28a5e44e7128dbf504ed8a73569c1 to your computer and use it in GitHub Desktop.
Pubsub
package pubsub
import (
"context"
"sync"
)
type (
UnsubscribeFn func()
DelegateFn[Message any] func(context.Context, Message) error
)
type Channel[Message any] struct {
subs map[chan Message]<-chan struct{}
subsM sync.RWMutex
}
func NewChannel[Message any]() *Channel[Message] {
return &Channel[Message]{
subs: make(map[chan Message]<-chan struct{}),
}
}
func (e *Channel[Message]) Send(msg Message) error {
e.subsM.RLock()
if e.subs == nil {
e.subsM.RUnlock()
return nil
}
unsub := make([]chan Message, 0)
for ch, done := range e.subs {
select {
case <-done:
unsub = append(unsub, ch)
case ch <- msg:
}
}
e.subsM.RUnlock()
if len(unsub) > 0 {
e.subsM.Lock()
for _, ch := range unsub {
delete(e.subs, ch)
}
e.subsM.Unlock()
}
return nil
}
func (e *Channel[Message]) Subscribe(ctx context.Context) (<-chan Message, UnsubscribeFn, error) {
e.subsM.Lock()
defer e.subsM.Unlock()
if e.subs == nil {
e.subs = make(map[chan Message]<-chan struct{})
}
ch := make(chan Message, 256)
e.subs[ch] = ctx.Done()
return ch, func() {
e.subsM.Lock()
defer e.subsM.Unlock()
delete(e.subs, ch)
close(ch)
}, nil
}
type DelegateMapFn[Message any] func(context.Context, Message) Message
func (e *Channel[Message]) Delegate(maps ...DelegateMapFn[Message]) DelegateFn[Message] {
return func(ctx context.Context, msg Message) error {
for _, mapFn := range maps {
msg = mapFn(ctx, msg)
}
return e.Send(msg)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment