-
-
Save pnlinh07/03bca80d06116a61fa9140e214f4cfd5 to your computer and use it in GitHub Desktop.
an example proof how we can do exponetial backoff retries with sqs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| const _ = require("lodash"); | |
| const moment = require("moment"); | |
| // Load the AWS SDK for Node.js | |
| const AWS = require("aws-sdk"); | |
| // Set the region to us-west-2 | |
| AWS.config.update({ region: "us-west-2" }); | |
| // Create the SQS service object | |
| const sqs = new AWS.SQS({ apiVersion: "2012-11-05" }); | |
| const queueURL = "https://sqs.REGION.amazonaws.com/ACCOUNT-ID/Ố-MÀI-GÓT"; | |
| const params = { | |
| AttributeNames: ["SentTimestamp"], | |
| MaxNumberOfMessages: 1, | |
| MessageAttributeNames: ["All"], | |
| QueueUrl: queueURL, | |
| }; | |
| const retryOptions = { | |
| minTimeout: 30, | |
| maxTimeout: 3600, | |
| factor: 2, | |
| randomize: true, | |
| maxAttempts: 5, | |
| }; | |
| sqs.receiveMessage(params, function (err, data) { | |
| if (err) { | |
| console.log("Receive Error", err); | |
| } else { | |
| const messages = _.get(data, "Messages", []); | |
| _.forEach(messages, (message) => { | |
| const attempt = _.chain(message) | |
| .get("Attributes.ApproximateReceiveCount", 1) | |
| .parseInt() | |
| .value(); | |
| const maxAttempts = _.chain(retryOptions) | |
| .get("maxAttempts", 5) | |
| .add(1) | |
| .value(); | |
| if (attempt <= maxAttempts) { | |
| _processMessage(message) | |
| .then(() => { | |
| // remove message from queue if message was processed | |
| _removeMessage(message); | |
| }) | |
| .catch((e) => { | |
| // do not thing then the message will be consume again after visibility timeout | |
| // _removeMessage(message); | |
| }); | |
| } else { | |
| _removeMessage(message); | |
| } | |
| const timeout = _createTimout(attempt, retryOptions); | |
| _changeMessageVisibility(message, timeout); | |
| }); | |
| } | |
| }); | |
| async function _processMessage(message) { | |
| console.log("message-------", message.MessageId, moment().format()); | |
| // do some thing with this message | |
| // if you faced with some errors then just throw and error (throw new Error('our partner server busy')) | |
| // then the message will not be remove from the queue, the message will be consume again after visibility timeout | |
| // if not just return truly value then message will be remove from queue | |
| } | |
| function _changeMessageVisibility(message, timeout) { | |
| const visibilityParams = { | |
| QueueUrl: queueURL, | |
| ReceiptHandle: message.ReceiptHandle, | |
| VisibilityTimeout: timeout, | |
| }; | |
| return new Promise((resolve, reject) => { | |
| sqs.changeMessageVisibility(visibilityParams, function (err, data) { | |
| if (err) { | |
| return reject(err); | |
| } | |
| return resolve(data); | |
| }); | |
| }); | |
| } | |
| function _removeMessage(message) { | |
| return new Promise((resolve, reject) => { | |
| sqs.deleteMessage( | |
| { | |
| QueueUrl: this.url, | |
| ReceiptHandle: message.ReceiptHandle, | |
| }, | |
| (err) => { | |
| if (err) { | |
| return reject(err); | |
| } | |
| return resolve(); | |
| } | |
| ); | |
| }); | |
| } | |
| // create exponential with exponetial backoff | |
| function _createTimout(attempt, retryOptions) { | |
| const minTimeout = _.get(retryOptions, "minTimeout", 30); | |
| const maxTimeout = _.get(retryOptions, "maxTimeout", 12 * 60 * 60); | |
| const factor = _.get(retryOptions, "factor", 2); | |
| const randomize = _.get(retryOptions, "randomize", false); | |
| const random = randomize ? Math.random() + 1 : 1; | |
| return _.min([ | |
| _.parseInt(random * minTimeout * Math.pow(factor, attempt)), | |
| maxTimeout, | |
| ]); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment