Created
January 28, 2021 18:03
-
-
Save PRASANTHRAJENDRAN/9e9a7208ea78c3df4e332374f077ab9f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.junit.jupiter.api.Assertions; | |
| import org.junit.jupiter.api.Test; | |
| import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; | |
| import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; | |
| import software.amazon.awssdk.regions.Region; | |
| import software.amazon.awssdk.services.sns.SnsClient; | |
| import software.amazon.awssdk.services.sns.model.CreateTopicRequest; | |
| import software.amazon.awssdk.services.sns.model.CreateTopicResponse; | |
| import software.amazon.awssdk.services.sns.model.PublishRequest; | |
| import software.amazon.awssdk.services.sns.model.PublishResponse; | |
| import software.amazon.awssdk.services.sns.model.SubscribeRequest; | |
| import software.amazon.awssdk.services.sns.model.SubscribeResponse; | |
| import software.amazon.awssdk.services.sqs.SqsClient; | |
| import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; | |
| import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; | |
| import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; | |
| import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; | |
| import software.amazon.awssdk.services.sqs.model.QueueAttributeName; | |
| import java.net.URI; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import java.util.UUID; | |
| class FifoTopicsITest { | |
| @Test | |
| void test() { | |
| final String topicName = UUID.randomUUID().toString().substring(15); | |
| //creating sns client | |
| SnsClient amazonSNS = SnsClient.builder().credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create( | |
| "<accessKey>", "<secretKey>"))).region(Region.of("us-west-1")) | |
| .endpointOverride(URI.create("https://sns.us-west-1.amazonaws.com")).build(); | |
| //creating sqs client | |
| SqsClient amazonSQS = SqsClient.builder().credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create( | |
| "<accessKey>", "<secretKey>"))).region(Region.of("us-west-1")) | |
| .endpointOverride(URI.create("https://sqs.us-west-1.amazonaws.com")).build(); | |
| //creating SNS topic | |
| Map<String, String> createSnsTopicAttribute = new HashMap<>(); | |
| createSnsTopicAttribute.put("FifoTopic", "true"); | |
| createSnsTopicAttribute.put("ContentBasedDeduplication", "false"); | |
| CreateTopicRequest createTopicRequest = CreateTopicRequest.builder().name(topicName + ".fifo").attributes(createSnsTopicAttribute).build(); | |
| CreateTopicResponse createTopicResponse = amazonSNS.createTopic(createTopicRequest); | |
| String topicArn = createTopicResponse.topicArn(); | |
| //creating dead-letter sqs queue | |
| Map<QueueAttributeName, String> createSqsQueueAttribute = new HashMap<>(); | |
| createSqsQueueAttribute.put(QueueAttributeName.FIFO_QUEUE, "true"); | |
| createSqsQueueAttribute.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false"); | |
| CreateQueueRequest createDLQQueueRequest = CreateQueueRequest.builder(). | |
| attributes(createSqsQueueAttribute). | |
| queueName(topicName + "_DLQ_" + ".fifo"). | |
| build(); | |
| CreateQueueResponse createDeadLetterQueueResponse = amazonSQS.createQueue(createDLQQueueRequest); | |
| //getting ARN value of dead-letter queue | |
| GetQueueAttributesResponse getQueueAttributesResponse = amazonSQS.getQueueAttributes( | |
| GetQueueAttributesRequest.builder().queueUrl(createDeadLetterQueueResponse.queueUrl()) | |
| .attributeNames(QueueAttributeName.QUEUE_ARN).build()); | |
| String deleteQueueArn = getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN); | |
| //creating sqs queue | |
| String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" | |
| + deleteQueueArn + "\"}"; | |
| createSqsQueueAttribute.put(QueueAttributeName.REDRIVE_POLICY, reDrivePolicy); | |
| CreateQueueRequest createQueueRequest = CreateQueueRequest.builder().queueName(topicName + ".fifo") | |
| .attributes(createSqsQueueAttribute).build(); | |
| CreateQueueResponse createQueueResponse = amazonSQS.createQueue(createQueueRequest); | |
| String queueUrl = createQueueResponse.queueUrl(); | |
| //getting ARN value of queue | |
| getQueueAttributesResponse = amazonSQS.getQueueAttributes( | |
| GetQueueAttributesRequest.builder().queueUrl(queueUrl) | |
| .attributeNames(QueueAttributeName.QUEUE_ARN).build()); | |
| String queueArn = getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN); | |
| //Subscribe FIFO queue to FIFO Topic | |
| SubscribeRequest subscribeRequest = SubscribeRequest.builder().protocol("sqs").topicArn(topicArn).endpoint(queueArn).build(); | |
| SubscribeResponse subscribeResponse = amazonSNS.subscribe(subscribeRequest); | |
| Assertions.assertNotNull(subscribeResponse.subscriptionArn()); | |
| //Publishing 4 sample message to FIFO SNS Topic | |
| for (int i = 0; i < 5; i++) { | |
| PublishRequest publishRequest = PublishRequest.builder() | |
| .topicArn(topicArn) | |
| .message("Test Message" + i) | |
| .messageGroupId(topicName) | |
| .messageDeduplicationId(UUID.randomUUID().toString()).build(); | |
| PublishResponse publishResponse = amazonSNS.publish(publishRequest); | |
| Assertions.assertNotNull(publishResponse.messageId()); | |
| } | |
| //Getting ApproximateNumberOfMessages no of messages from the FIFO Queue | |
| getQueueAttributesResponse = amazonSQS.getQueueAttributes( | |
| GetQueueAttributesRequest.builder().queueUrl(queueUrl) | |
| .attributeNames(QueueAttributeName.ALL).build()); | |
| String approximateNumberOfMessages = getQueueAttributesResponse.attributes() | |
| .get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES); | |
| //My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue | |
| Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages)); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment