Skip to content

Instantly share code, notes, and snippets.

@PRASANTHRAJENDRAN
Created January 28, 2021 18:03
Show Gist options
  • Select an option

  • Save PRASANTHRAJENDRAN/9e9a7208ea78c3df4e332374f077ab9f to your computer and use it in GitHub Desktop.

Select an option

Save PRASANTHRAJENDRAN/9e9a7208ea78c3df4e332374f077ab9f to your computer and use it in GitHub Desktop.
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
import software.amazon.awssdk.services.sns.model.CreateTopicResponse;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;
import software.amazon.awssdk.services.sns.model.SubscribeRequest;
import software.amazon.awssdk.services.sns.model.SubscribeResponse;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
class FifoTopicsITest {
@Test
void test() {
final String topicName = UUID.randomUUID().toString().substring(15);
//creating sns client
SnsClient amazonSNS = SnsClient.builder().credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
"<accessKey>", "<secretKey>"))).region(Region.of("us-west-1"))
.endpointOverride(URI.create("https://sns.us-west-1.amazonaws.com")).build();
//creating sqs client
SqsClient amazonSQS = SqsClient.builder().credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
"<accessKey>", "<secretKey>"))).region(Region.of("us-west-1"))
.endpointOverride(URI.create("https://sqs.us-west-1.amazonaws.com")).build();
//creating SNS topic
Map<String, String> createSnsTopicAttribute = new HashMap<>();
createSnsTopicAttribute.put("FifoTopic", "true");
createSnsTopicAttribute.put("ContentBasedDeduplication", "false");
CreateTopicRequest createTopicRequest = CreateTopicRequest.builder().name(topicName + ".fifo").attributes(createSnsTopicAttribute).build();
CreateTopicResponse createTopicResponse = amazonSNS.createTopic(createTopicRequest);
String topicArn = createTopicResponse.topicArn();
//creating dead-letter sqs queue
Map<QueueAttributeName, String> createSqsQueueAttribute = new HashMap<>();
createSqsQueueAttribute.put(QueueAttributeName.FIFO_QUEUE, "true");
createSqsQueueAttribute.put(QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "false");
CreateQueueRequest createDLQQueueRequest = CreateQueueRequest.builder().
attributes(createSqsQueueAttribute).
queueName(topicName + "_DLQ_" + ".fifo").
build();
CreateQueueResponse createDeadLetterQueueResponse = amazonSQS.createQueue(createDLQQueueRequest);
//getting ARN value of dead-letter queue
GetQueueAttributesResponse getQueueAttributesResponse = amazonSQS.getQueueAttributes(
GetQueueAttributesRequest.builder().queueUrl(createDeadLetterQueueResponse.queueUrl())
.attributeNames(QueueAttributeName.QUEUE_ARN).build());
String deleteQueueArn = getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN);
//creating sqs queue
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deleteQueueArn + "\"}";
createSqsQueueAttribute.put(QueueAttributeName.REDRIVE_POLICY, reDrivePolicy);
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder().queueName(topicName + ".fifo")
.attributes(createSqsQueueAttribute).build();
CreateQueueResponse createQueueResponse = amazonSQS.createQueue(createQueueRequest);
String queueUrl = createQueueResponse.queueUrl();
//getting ARN value of queue
getQueueAttributesResponse = amazonSQS.getQueueAttributes(
GetQueueAttributesRequest.builder().queueUrl(queueUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN).build());
String queueArn = getQueueAttributesResponse.attributes().get(QueueAttributeName.QUEUE_ARN);
//Subscribe FIFO queue to FIFO Topic
SubscribeRequest subscribeRequest = SubscribeRequest.builder().protocol("sqs").topicArn(topicArn).endpoint(queueArn).build();
SubscribeResponse subscribeResponse = amazonSNS.subscribe(subscribeRequest);
Assertions.assertNotNull(subscribeResponse.subscriptionArn());
//Publishing 4 sample message to FIFO SNS Topic
for (int i = 0; i < 5; i++) {
PublishRequest publishRequest = PublishRequest.builder()
.topicArn(topicArn)
.message("Test Message" + i)
.messageGroupId(topicName)
.messageDeduplicationId(UUID.randomUUID().toString()).build();
PublishResponse publishResponse = amazonSNS.publish(publishRequest);
Assertions.assertNotNull(publishResponse.messageId());
}
//Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
getQueueAttributesResponse = amazonSQS.getQueueAttributes(
GetQueueAttributesRequest.builder().queueUrl(queueUrl)
.attributeNames(QueueAttributeName.ALL).build());
String approximateNumberOfMessages = getQueueAttributesResponse.attributes()
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES);
//My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment