Skip to content

Instantly share code, notes, and snippets.

@stephde
Last active January 11, 2021 14:01
Show Gist options
  • Select an option

  • Save stephde/9248874aff85a416c0237c2b42da27df to your computer and use it in GitHub Desktop.

Select an option

Save stephde/9248874aff85a416c0237c2b42da27df to your computer and use it in GitHub Desktop.
Delayed task execution in golang with AWS SQS
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/google/uuid"
)
const endpoint = "http://localhost:9324"
const queueName = "default"
const maxMessages = 10
const consumerWaitTime = 5
const AWSAccessKey = "access_key"
const AWSAccessSecret = "access_secret"
const AWSRegion = "eu-central-1"
type task struct {
UUID uuid.UUID `json:"uuid"`
State string `json:"state"`
}
type sqsClient struct {
client sqsiface.SQSAPI
config sqs.ReceiveMessageInput
}
func main() {
awsConfig := &aws.Config{
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(AWSAccessKey, AWSAccessSecret, "token"),
Region: aws.String(AWSRegion),
}
client := newClient(awsConfig)
items := []task{
{uuid.New(), "to_be_done"},
{uuid.New(), "do_be_done"},
}
for i, item := range items {
client.addTask(item, int64((i+1)*4))
}
fmt.Println(fmt.Sprintf("Sent %d tasks to SQS", len(items)))
// should print 1 task
tasks := client.receiveMessages()
fmt.Println("Received tasks:", tasks)
// should print 1 task again
tasks = client.receiveMessages()
fmt.Println("Received tasks:", tasks)
}
func newClient(cfg *aws.Config) sqsClient {
awsSession := session.Must(session.NewSession(cfg))
client := sqs.New(awsSession, cfg)
queueURL := getQueueUrl(client, queueName)
config := sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: aws.Int64(maxMessages),
MessageAttributeNames: []*string{
aws.String("uuid"), aws.String("state"),
},
AttributeNames: []*string{},
WaitTimeSeconds: aws.Int64(consumerWaitTime),
}
return sqsClient{
client: client,
config: config,
}
}
func getQueueUrl(client *sqs.SQS, name string) string {
params := &sqs.GetQueueUrlInput{
QueueName: aws.String(name), // Required
}
response, err := client.GetQueueUrl(params)
if err != nil {
fmt.Println(err.Error())
return ""
}
return aws.StringValue(response.QueueUrl)
}
func (sqsc sqsClient) addTask(item task, delay int64) {
content := map[string]*sqs.MessageAttributeValue{
"uuid": &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(item.UUID.String()),
},
"state": &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(item.State),
},
}
msg := &sqs.SendMessageInput{
DelaySeconds: &delay,
MessageAttributes: content,
MessageBody: aws.String(fmt.Sprintf("A task to be handle in the future with ID %s", item.UUID.String())),
QueueUrl: sqsc.config.QueueUrl,
}
_, err := sqsc.client.SendMessage(msg)
if err != nil {
fmt.Println(err)
}
}
func (sqsc sqsClient) receiveMessages() []*sqs.Message {
resp, err := sqsc.client.ReceiveMessage(&sqsc.config)
if err != nil {
fmt.Println(err)
return []*sqs.Message{}
}
for _, m := range resp.Messages {
sqsc.deleteMessage(m.ReceiptHandle)
}
return resp.Messages
}
func (sqsc sqsClient) deleteMessage(messageHandle *string) {
params := &sqs.DeleteMessageInput{
QueueUrl: sqsc.config.QueueUrl,
ReceiptHandle: messageHandle,
}
_, err := sqsc.client.DeleteMessage(params)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(fmt.Sprintf("deleted message from queue: %s", *messageHandle))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment