Last active
April 11, 2024 16:12
-
-
Save stephde/7f4a01a845d890201eee6d02dd92a853 to your computer and use it in GitHub Desktop.
Delayed task queue with redis in golang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "encoding/json" | |
| "fmt" | |
| "time" | |
| "github.com/go-redis/redis" | |
| "github.com/google/uuid" | |
| ) | |
| const delayedList = "delayed" | |
| type redisClient struct { | |
| client *redis.Client | |
| } | |
| type task struct { | |
| UUID uuid.UUID `json:"uuid"` | |
| State string `json:"state"` | |
| } | |
| func main() { | |
| client := client() | |
| items := []task{ | |
| {uuid.New(), "to_be_done"}, | |
| {uuid.New(), "do_be_done"}, | |
| } | |
| for _, item := range items { | |
| client.addDelayed(item, time.Second*5) | |
| } | |
| // should print 0 since none is ready | |
| tasks := client.getReadyTasks() | |
| fmt.Println("Received tasks:", tasks) | |
| time.Sleep(time.Second * 6) | |
| // should print 2 tasks | |
| tasks = client.getReadyTasks() | |
| fmt.Println("Received tasks:", tasks) | |
| } | |
| func client() redisClient { | |
| rdb := redis.NewClient(&redis.Options{ | |
| Addr: "localhost:6379", | |
| Password: "", // no password set | |
| DB: 0, // use default DB | |
| }) | |
| return redisClient{ | |
| client: rdb, | |
| } | |
| } | |
| func (c redisClient) addDelayed(value task, delay time.Duration) { | |
| jsonValue, err := json.Marshal(value) | |
| if err != nil { | |
| fmt.Println(err) | |
| panic("JSON!!!") | |
| } | |
| taskReadyInSeconds := time.Now().Add(delay).Unix() | |
| member := redis.Z{ | |
| Score: float64(taskReadyInSeconds), | |
| Member: jsonValue, | |
| } | |
| _, err = c.client.ZAdd(delayedList, member).Result() | |
| if err != nil { | |
| fmt.Println(err) | |
| } | |
| } | |
| func (c redisClient) getReadyTasks() []task { | |
| maxTime := time.Now().Unix() | |
| opt := redis.ZRangeBy{ | |
| Min: fmt.Sprintf("%d", 0), | |
| Max: fmt.Sprintf("%d", maxTime), | |
| } | |
| cmd := c.client.ZRevRangeByScore(delayedList, opt) | |
| resultSet, err := cmd.Result() | |
| if err != nil { | |
| fmt.Println(err) | |
| panic("redis_error") | |
| } | |
| tasks := make([]task, len(resultSet)) | |
| for i, t := range resultSet { | |
| err := json.Unmarshal([]byte(t), &tasks[i]) | |
| if err != nil { | |
| fmt.Println(err) | |
| panic("JSON!!!") | |
| } | |
| } | |
| c.removeTasks(tasks) | |
| return tasks | |
| } | |
| func (c redisClient) removeTasks(tasks []task) { | |
| fmt.Println("ZRem called with: ", len(tasks), "tasks") | |
| if len(tasks) == 0 { | |
| return | |
| } | |
| jsonTasks := make([]string, len(tasks)) | |
| for i, t := range tasks { | |
| jsonValue, err := json.Marshal(t) | |
| if err != nil { | |
| fmt.Println(err) | |
| panic("JSON!!!") | |
| } | |
| jsonTasks[i] = string(jsonValue) | |
| } | |
| _, err := c.client.ZRem(delayedList, jsonTasks).Result() | |
| if err != nil { | |
| fmt.Println(err) | |
| panic("redis_error") | |
| } | |
| } | |
| func (c redisClient) del() { | |
| c.client.Del(delayedList) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment