Skip to content

Instantly share code, notes, and snippets.

@suhodolskiy
Last active September 1, 2025 05:45
Show Gist options
  • Select an option

  • Save suhodolskiy/1fd884243d0e091f3fd07ec393d5904c to your computer and use it in GitHub Desktop.

Select an option

Save suhodolskiy/1fd884243d0e091f3fd07ec393d5904c to your computer and use it in GitHub Desktop.
package cache
import (
"bytes"
"context"
"crypto/sha256"
"encoding/gob"
"fmt"
"sync"
"time"
)
const DefaultTtl = time.Hour * 24 * 7
type Loader[T any] struct {
registry map[string][]chan T
store Store
mu sync.Mutex
config Config
}
type Config struct {
Key string
Store Store
Tags []string
TTL time.Duration
}
func NewLoader[T any](config Config) *Loader[T] {
if config.TTL == 0 {
config.TTL = DefaultTtl
}
loader := Loader[T]{
registry: make(map[string][]chan T),
mu: sync.Mutex{},
store: config.Store,
config: config,
}
return &loader
}
func (c *Loader[T]) key(key any) string {
if value, ok := key.(string); ok {
return fmt.Sprintf("%s_%s", c.config.Key, value)
}
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(key); err != nil {
panic(err)
}
hash := sha256.Sum256(b.Bytes())
return fmt.Sprintf("%s_%x", c.config.Key, hash)
}
func (c *Loader[T]) Get(ctx context.Context, key any, data func() (T, error), tags ...string) (T, error) {
if !IsCacheEnabledCtx(ctx) {
return data()
}
k := c.key(key)
// Return the value if found in the store.
var value T
if err := c.store.Get(ctx, k, &value); err == nil {
return value, nil
}
// Lock the mutex to access the registry.
c.mu.Lock()
if _, ok := c.registry[k]; ok {
ch := make(chan T, 1)
c.registry[k] = append(c.registry[k], ch)
c.mu.Unlock()
// Wait for the value to be set and return it.
return <-ch, nil
} else {
// Initialize a new channel list if the key is not present.
c.registry[k] = []chan T{}
c.mu.Unlock()
}
go func() {
c.mu.Lock()
defer c.mu.Unlock()
// Remove the key from the registry when done.
delete(c.registry, k)
}()
value, err := data()
if err != nil {
return value, err
}
c.mu.Lock()
channels, exists := c.registry[k]
c.mu.Unlock()
if exists {
for _, ch := range channels {
// Send the value to waiting channels.
select {
case ch <- value:
// Close the channel after sending the value.
close(ch)
default:
}
}
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
stags := c.config.Tags
if tags != nil {
for _, tag := range tags {
stags = append(stags, fmt.Sprintf("%s:%s", c.config.Key, tag))
}
}
if err := c.store.Set(ctx, k, value, stags, c.config.TTL); err != nil {
fmt.Println("store.Set:", err)
// TODO: Log error
}
}()
return value, nil
}
func (c *Loader[T]) Invalidate(ctx context.Context, tags ...string) error {
var itags []string
if tags != nil {
for _, tag := range tags {
itags = append(itags, fmt.Sprintf("%s:%s", c.config.Key, tag))
}
} else {
itags = c.config.Tags
}
return c.store.Invalidate(ctx, itags...)
}
package cache
import (
"context"
"time"
)
type Store interface {
Get(ctx context.Context, key string, value any) error
Set(ctx context.Context, key string, value any, tags []string, ttl time.Duration) error
Invalidate(ctx context.Context, tags ...string) error
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment