Skip to content

Instantly share code, notes, and snippets.

@stephde
Last active April 11, 2024 16:12
Show Gist options
  • Select an option

  • Save stephde/7f4a01a845d890201eee6d02dd92a853 to your computer and use it in GitHub Desktop.

Select an option

Save stephde/7f4a01a845d890201eee6d02dd92a853 to your computer and use it in GitHub Desktop.
Delayed task queue with redis in golang
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/go-redis/redis"
"github.com/google/uuid"
)
const delayedList = "delayed"
type redisClient struct {
client *redis.Client
}
type task struct {
UUID uuid.UUID `json:"uuid"`
State string `json:"state"`
}
func main() {
client := client()
items := []task{
{uuid.New(), "to_be_done"},
{uuid.New(), "do_be_done"},
}
for _, item := range items {
client.addDelayed(item, time.Second*5)
}
// should print 0 since none is ready
tasks := client.getReadyTasks()
fmt.Println("Received tasks:", tasks)
time.Sleep(time.Second * 6)
// should print 2 tasks
tasks = client.getReadyTasks()
fmt.Println("Received tasks:", tasks)
}
func client() redisClient {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
return redisClient{
client: rdb,
}
}
func (c redisClient) addDelayed(value task, delay time.Duration) {
jsonValue, err := json.Marshal(value)
if err != nil {
fmt.Println(err)
panic("JSON!!!")
}
taskReadyInSeconds := time.Now().Add(delay).Unix()
member := redis.Z{
Score: float64(taskReadyInSeconds),
Member: jsonValue,
}
_, err = c.client.ZAdd(delayedList, member).Result()
if err != nil {
fmt.Println(err)
}
}
func (c redisClient) getReadyTasks() []task {
maxTime := time.Now().Unix()
opt := redis.ZRangeBy{
Min: fmt.Sprintf("%d", 0),
Max: fmt.Sprintf("%d", maxTime),
}
cmd := c.client.ZRevRangeByScore(delayedList, opt)
resultSet, err := cmd.Result()
if err != nil {
fmt.Println(err)
panic("redis_error")
}
tasks := make([]task, len(resultSet))
for i, t := range resultSet {
err := json.Unmarshal([]byte(t), &tasks[i])
if err != nil {
fmt.Println(err)
panic("JSON!!!")
}
}
c.removeTasks(tasks)
return tasks
}
func (c redisClient) removeTasks(tasks []task) {
fmt.Println("ZRem called with: ", len(tasks), "tasks")
if len(tasks) == 0 {
return
}
jsonTasks := make([]string, len(tasks))
for i, t := range tasks {
jsonValue, err := json.Marshal(t)
if err != nil {
fmt.Println(err)
panic("JSON!!!")
}
jsonTasks[i] = string(jsonValue)
}
_, err := c.client.ZRem(delayedList, jsonTasks).Result()
if err != nil {
fmt.Println(err)
panic("redis_error")
}
}
func (c redisClient) del() {
c.client.Del(delayedList)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment