Last active
January 11, 2021 14:01
-
-
Save stephde/9248874aff85a416c0237c2b42da27df to your computer and use it in GitHub Desktop.
Delayed task execution in golang with AWS SQS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "fmt" | |
| "github.com/aws/aws-sdk-go/aws" | |
| "github.com/aws/aws-sdk-go/aws/credentials" | |
| "github.com/aws/aws-sdk-go/aws/session" | |
| "github.com/aws/aws-sdk-go/service/sqs" | |
| "github.com/aws/aws-sdk-go/service/sqs/sqsiface" | |
| "github.com/google/uuid" | |
| ) | |
| const endpoint = "http://localhost:9324" | |
| const queueName = "default" | |
| const maxMessages = 10 | |
| const consumerWaitTime = 5 | |
| const AWSAccessKey = "access_key" | |
| const AWSAccessSecret = "access_secret" | |
| const AWSRegion = "eu-central-1" | |
| type task struct { | |
| UUID uuid.UUID `json:"uuid"` | |
| State string `json:"state"` | |
| } | |
| type sqsClient struct { | |
| client sqsiface.SQSAPI | |
| config sqs.ReceiveMessageInput | |
| } | |
| func main() { | |
| awsConfig := &aws.Config{ | |
| Endpoint: aws.String(endpoint), | |
| Credentials: credentials.NewStaticCredentials(AWSAccessKey, AWSAccessSecret, "token"), | |
| Region: aws.String(AWSRegion), | |
| } | |
| client := newClient(awsConfig) | |
| items := []task{ | |
| {uuid.New(), "to_be_done"}, | |
| {uuid.New(), "do_be_done"}, | |
| } | |
| for i, item := range items { | |
| client.addTask(item, int64((i+1)*4)) | |
| } | |
| fmt.Println(fmt.Sprintf("Sent %d tasks to SQS", len(items))) | |
| // should print 1 task | |
| tasks := client.receiveMessages() | |
| fmt.Println("Received tasks:", tasks) | |
| // should print 1 task again | |
| tasks = client.receiveMessages() | |
| fmt.Println("Received tasks:", tasks) | |
| } | |
| func newClient(cfg *aws.Config) sqsClient { | |
| awsSession := session.Must(session.NewSession(cfg)) | |
| client := sqs.New(awsSession, cfg) | |
| queueURL := getQueueUrl(client, queueName) | |
| config := sqs.ReceiveMessageInput{ | |
| QueueUrl: aws.String(queueURL), | |
| MaxNumberOfMessages: aws.Int64(maxMessages), | |
| MessageAttributeNames: []*string{ | |
| aws.String("uuid"), aws.String("state"), | |
| }, | |
| AttributeNames: []*string{}, | |
| WaitTimeSeconds: aws.Int64(consumerWaitTime), | |
| } | |
| return sqsClient{ | |
| client: client, | |
| config: config, | |
| } | |
| } | |
| func getQueueUrl(client *sqs.SQS, name string) string { | |
| params := &sqs.GetQueueUrlInput{ | |
| QueueName: aws.String(name), // Required | |
| } | |
| response, err := client.GetQueueUrl(params) | |
| if err != nil { | |
| fmt.Println(err.Error()) | |
| return "" | |
| } | |
| return aws.StringValue(response.QueueUrl) | |
| } | |
| func (sqsc sqsClient) addTask(item task, delay int64) { | |
| content := map[string]*sqs.MessageAttributeValue{ | |
| "uuid": &sqs.MessageAttributeValue{ | |
| DataType: aws.String("String"), | |
| StringValue: aws.String(item.UUID.String()), | |
| }, | |
| "state": &sqs.MessageAttributeValue{ | |
| DataType: aws.String("String"), | |
| StringValue: aws.String(item.State), | |
| }, | |
| } | |
| msg := &sqs.SendMessageInput{ | |
| DelaySeconds: &delay, | |
| MessageAttributes: content, | |
| MessageBody: aws.String(fmt.Sprintf("A task to be handle in the future with ID %s", item.UUID.String())), | |
| QueueUrl: sqsc.config.QueueUrl, | |
| } | |
| _, err := sqsc.client.SendMessage(msg) | |
| if err != nil { | |
| fmt.Println(err) | |
| } | |
| } | |
| func (sqsc sqsClient) receiveMessages() []*sqs.Message { | |
| resp, err := sqsc.client.ReceiveMessage(&sqsc.config) | |
| if err != nil { | |
| fmt.Println(err) | |
| return []*sqs.Message{} | |
| } | |
| for _, m := range resp.Messages { | |
| sqsc.deleteMessage(m.ReceiptHandle) | |
| } | |
| return resp.Messages | |
| } | |
| func (sqsc sqsClient) deleteMessage(messageHandle *string) { | |
| params := &sqs.DeleteMessageInput{ | |
| QueueUrl: sqsc.config.QueueUrl, | |
| ReceiptHandle: messageHandle, | |
| } | |
| _, err := sqsc.client.DeleteMessage(params) | |
| if err != nil { | |
| fmt.Println(err) | |
| return | |
| } | |
| fmt.Println(fmt.Sprintf("deleted message from queue: %s", *messageHandle)) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment