Last active
June 7, 2020 18:34
-
-
Save juan-medina/f3253f5e88f6969c9fec0af17a951b91 to your computer and use it in GitHub Desktop.
kafka pets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092 | |
| ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic pet_commands | |
| ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic pet_commands |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.learning.by.example.petstore.petcommands.service | |
| import org.learning.by.example.petstore.petcommands.model.Pet | |
| import reactor.core.publisher.Mono | |
| interface PetCommands { | |
| fun sendPetCreate(monoPet: Mono<Pet>): Mono<String> | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.learning.by.example.petstore.petcommands.service | |
| import org.apache.kafka.clients.producer.ProducerConfig | |
| import org.apache.kafka.clients.producer.ProducerRecord | |
| import org.apache.kafka.common.serialization.StringSerializer | |
| import org.learning.by.example.petstore.petcommands.model.Pet | |
| import org.springframework.stereotype.Service | |
| import reactor.core.publisher.Flux | |
| import reactor.core.publisher.Mono | |
| import reactor.kafka.sender.KafkaSender | |
| import reactor.kafka.sender.SenderOptions | |
| import reactor.kafka.sender.SenderRecord | |
| import java.util.* | |
| @Service | |
| class PetCommandsImpl : PetCommands { | |
| companion object { | |
| private const val PRODUCER_ID = "pet_commands_producer" | |
| private const val SERVER_CONFIG = "localhost:9092" | |
| private const val TOPIC = "pet_commands" | |
| private const val ALL_ACK = "all" | |
| } | |
| private final val producer: KafkaSender<String, String> | |
| init { | |
| val props: MutableMap<String, Any> = HashMap() | |
| props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SERVER_CONFIG | |
| props[ProducerConfig.CLIENT_ID_CONFIG] = PRODUCER_ID | |
| props[ProducerConfig.ACKS_CONFIG] = ALL_ACK | |
| props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java | |
| props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java | |
| producer = KafkaSender.create(SenderOptions.create(props)) | |
| } | |
| override fun sendPetCreate(monoPet: Mono<Pet>): Mono<String> { | |
| return monoPet.flatMap { | |
| val id = UUID.randomUUID().toString() | |
| producer.send(Flux.just(SenderRecord.create(ProducerRecord(TOPIC, id, id), id))) | |
| .single().map { id } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.learning.by.example.petstore.petcommands.handlers | |
| import org.learning.by.example.petstore.petcommands.model.ErrorResponse | |
| import org.learning.by.example.petstore.petcommands.model.Pet | |
| import org.learning.by.example.petstore.petcommands.model.Result | |
| import org.learning.by.example.petstore.petcommands.service.PetCommands | |
| import org.learning.by.example.petstore.reactor.dtovalidator.DTOValidator | |
| import org.learning.by.example.petstore.reactor.dtovalidator.InvalidDtoException | |
| import org.springframework.http.HttpStatus | |
| import org.springframework.http.MediaType | |
| import org.springframework.stereotype.Service | |
| import org.springframework.web.reactive.function.server.ServerRequest | |
| import org.springframework.web.reactive.function.server.ServerResponse | |
| import org.springframework.web.reactive.function.server.body | |
| import org.springframework.web.reactive.function.server.bodyToMono | |
| import reactor.core.publisher.Mono | |
| import reactor.kotlin.core.publisher.toMono | |
| import java.net.URI | |
| @Service | |
| class PetHandler( | |
| val dto: DTOValidator, | |
| val petCommands: PetCommands | |
| ) { | |
| companion object { | |
| const val INVALID_RESOURCE = "Invalid Resource" | |
| const val SERVER_ERROR = "Server Error" | |
| } | |
| private fun toResponse(id: String) = ServerResponse.created(URI.create("/pet/${id}")) | |
| .contentType(MediaType.APPLICATION_JSON) | |
| .body(Result(id).toMono()) | |
| private fun toError(throwable: Throwable) = if (throwable is InvalidDtoException) { | |
| ServerResponse.status(HttpStatus.BAD_REQUEST) | |
| .contentType(MediaType.APPLICATION_JSON) | |
| .body(ErrorResponse(INVALID_RESOURCE, throwable.message!!).toMono()) | |
| } else { | |
| ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR) | |
| .contentType(MediaType.APPLICATION_JSON) | |
| .body(ErrorResponse(SERVER_ERROR, throwable.localizedMessage!!).toMono()) | |
| } | |
| private fun validate(monoPet: Mono<Pet>) = dto.validate(monoPet) | |
| fun postPet(serverRequest: ServerRequest) = serverRequest.bodyToMono<Pet>() | |
| .transform(this::validate) | |
| .transform(petCommands::sendPetCreate) | |
| .flatMap(this::toResponse) | |
| .onErrorResume(this::toError) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.learning.by.example.petstore.petcommands.handlers | |
| import org.apache.kafka.clients.consumer.ConsumerConfig | |
| import org.apache.kafka.common.serialization.StringDeserializer | |
| import org.assertj.core.api.Assertions.assertThat | |
| import org.junit.jupiter.api.DynamicTest | |
| import org.junit.jupiter.api.Test | |
| import org.junit.jupiter.api.TestFactory | |
| import org.junit.jupiter.api.extension.ExtendWith | |
| import org.learning.by.example.petstore.petcommands.handlers.PetHandler.Companion.INVALID_RESOURCE | |
| import org.learning.by.example.petstore.petcommands.model.ErrorResponse | |
| import org.learning.by.example.petstore.petcommands.model.Result | |
| import org.learning.by.example.petstore.petcommands.testing.verify | |
| import org.springframework.beans.factory.annotation.Autowired | |
| import org.springframework.boot.test.context.SpringBootTest | |
| import org.springframework.http.HttpStatus | |
| import org.springframework.http.MediaType | |
| import org.springframework.mock.http.server.reactive.MockServerHttpRequest | |
| import org.springframework.mock.web.server.MockServerWebExchange | |
| import org.springframework.test.context.junit.jupiter.SpringExtension | |
| import org.springframework.web.reactive.function.server.HandlerStrategies | |
| import org.springframework.web.reactive.function.server.ServerRequest | |
| import reactor.kafka.receiver.KafkaReceiver | |
| import reactor.kafka.receiver.ReceiverOptions | |
| import reactor.test.StepVerifier | |
| @ExtendWith(SpringExtension::class) | |
| @SpringBootTest | |
| class PetHandlerTest(@Autowired private val petHandler: PetHandler) { | |
| companion object { | |
| private const val PET_URL = "/pet" | |
| private const val VALID_UUID = "[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}" | |
| const val VALID_PET_URL = "$PET_URL/$VALID_UUID" | |
| const val VALID_PET = """ | |
| { | |
| "name": "fluffy", | |
| "category": "dog", | |
| "tags" : [ | |
| "soft", | |
| "beauty", | |
| "good-boy" | |
| ] | |
| } | |
| """ | |
| private const val CLIENT_ID = "pet_commands_consumer" | |
| private const val GROUP_ID = "pet_commands_consumers" | |
| private const val OFFSET_EARLIEST = "earliest" | |
| private const val SERVER_CONFIG = "localhost:9092" | |
| private const val TOPIC = "pet_commands" | |
| } | |
| data class TestCase(val name: String, val parameters: Parameters, val expect: Expect) { | |
| data class Parameters(val body: String) | |
| data class Expect(val error: String) | |
| } | |
| @TestFactory | |
| fun `We should get bad request when adding a pet with bad input`() = listOf( | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with empty name", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "", | |
| "category": "dog" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid name, size must be between 3 and 20." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a long name", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "supersupersuperfluffy", | |
| "category": "dog" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid name, size must be between 3 and 20." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a bad name", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "super fluffy", | |
| "category": "dog" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid name, should be alphanumeric." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with no name", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "category": "dog" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid name, must not be null." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with empty category", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy1", | |
| "category": "" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid category, size must be between 3 and 15." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a long category", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy2", | |
| "category": "megamegamegamegadog" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid category, size must be between 3 and 15." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a bad category", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy", | |
| "category": "dog1" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid category, should be only alphabetic characters." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with no category", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy" | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid category, must not be null." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a invalid tags", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy", | |
| "category" : "dog", | |
| "tags" : [ "do" ] | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid tags, each should be between 3 and 15 alphabetic characters or hyphen." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a valid & invalid tags", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy", | |
| "category" : "dog", | |
| "tags" : [ "beauty", "do" ] | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid tags, each should be between 3 and 15 alphabetic characters or hyphen." | |
| ) | |
| ), | |
| TestCase( | |
| name = "we should get a bad request when trying to add a pet with a empty tag", | |
| parameters = TestCase.Parameters( | |
| body = """ | |
| { | |
| "name": "fluffy", | |
| "category" : "dog", | |
| "tags" : [ "" ] | |
| } | |
| """ | |
| ), | |
| expect = TestCase.Expect( | |
| error = "Invalid tags, each should be between 3 and 15 alphabetic characters or hyphen." | |
| ) | |
| ) | |
| ).map { | |
| DynamicTest.dynamicTest(it.name) { | |
| val httpRequest = MockServerHttpRequest | |
| .post("/pet") | |
| .contentType(MediaType.APPLICATION_JSON) | |
| .body(it.parameters.body) | |
| val webExchange = MockServerWebExchange.from(httpRequest) | |
| val request = ServerRequest.create(webExchange, HandlerStrategies.withDefaults().messageReaders()) | |
| petHandler.postPet(request).verify { response, result: ErrorResponse -> | |
| assertThat(response.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST) | |
| assertThat(response.headers().contentType).isEqualTo(MediaType.APPLICATION_JSON) | |
| assertThat(response.headers().location).isNull() | |
| assertThat(result.message).isEqualTo(INVALID_RESOURCE) | |
| assertThat(result.description).isEqualTo(it.expect.error) | |
| } | |
| } | |
| } | |
| @Test | |
| fun `we should get the result and headers when adding a pet`() { | |
| // we may need to change this if this issue isn't fix : | |
| // https://github.com/spring-projects/spring-framework/issues/25087 | |
| val httpRequest = MockServerHttpRequest | |
| .post("/pet") | |
| .contentType(MediaType.APPLICATION_JSON) | |
| .body(VALID_PET) | |
| val webExchange = MockServerWebExchange.from(httpRequest) | |
| val request = ServerRequest.create(webExchange, HandlerStrategies.withDefaults().messageReaders()) | |
| petHandler.postPet(request).verify { response, result: Result -> | |
| assertThat(response.statusCode()).isEqualTo(HttpStatus.CREATED) | |
| assertThat(response.headers().location.toString()).matches(VALID_PET_URL) | |
| assertThat(response.headers().contentType).isEqualTo(MediaType.APPLICATION_JSON) | |
| assertThat(result.id).matches(VALID_UUID) | |
| } | |
| } | |
| @Test | |
| fun `we should get a pet create command sent when posting a pet`() { | |
| // we may need to change this if this issue isn't fix : | |
| // https://github.com/spring-projects/spring-framework/issues/25087 | |
| val httpRequest = MockServerHttpRequest | |
| .post("/pet") | |
| .contentType(MediaType.APPLICATION_JSON) | |
| .body(VALID_PET) | |
| val webExchange = MockServerWebExchange.from(httpRequest) | |
| val request = ServerRequest.create(webExchange, HandlerStrategies.withDefaults().messageReaders()) | |
| var id = "" | |
| petHandler.postPet(request).verify { response, result: Result -> | |
| assertThat(response.statusCode()).isEqualTo(HttpStatus.CREATED) | |
| assertThat(response.headers().location.toString()).matches(VALID_PET_URL) | |
| assertThat(response.headers().contentType).isEqualTo(MediaType.APPLICATION_JSON) | |
| id = result.id | |
| assertThat(result.id).matches(VALID_UUID) | |
| StepVerifier.create(getStrings()) | |
| .expectSubscription() | |
| .thenRequest(Long.MAX_VALUE) | |
| .expectNext(id) | |
| .expectNextCount(0L) | |
| .thenCancel() | |
| .verify() | |
| } | |
| } | |
| fun getStrings() = getKafkaReceiver().receive().map { | |
| val receiverOffset = it.receiverOffset() | |
| receiverOffset.acknowledge() | |
| it.value() | |
| } | |
| private fun getKafkaReceiver(): KafkaReceiver<String, String> { | |
| val props: MutableMap<String, Any> = HashMap() | |
| props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = SERVER_CONFIG | |
| props[ConsumerConfig.CLIENT_ID_CONFIG] = CLIENT_ID | |
| props[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID | |
| props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java | |
| props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java | |
| props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = OFFSET_EARLIEST | |
| return KafkaReceiver.create(ReceiverOptions.create<String, String>(props).subscription(setOf(TOPIC))) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment