Skip to content

Instantly share code, notes, and snippets.

@juan-medina
Last active June 7, 2020 18:34
Show Gist options
  • Select an option

  • Save juan-medina/f3253f5e88f6969c9fec0af17a951b91 to your computer and use it in GitHub Desktop.

Select an option

Save juan-medina/f3253f5e88f6969c9fec0af17a951b91 to your computer and use it in GitHub Desktop.
kafka pets
./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic pet_commands
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic pet_commands
package org.learning.by.example.petstore.petcommands.service
import org.learning.by.example.petstore.petcommands.model.Pet
import reactor.core.publisher.Mono
interface PetCommands {
fun sendPetCreate(monoPet: Mono<Pet>): Mono<String>
}
package org.learning.by.example.petstore.petcommands.service
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.learning.by.example.petstore.petcommands.model.Pet
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
import reactor.kafka.sender.SenderRecord
import java.util.*
@Service
class PetCommandsImpl : PetCommands {
companion object {
private const val PRODUCER_ID = "pet_commands_producer"
private const val SERVER_CONFIG = "localhost:9092"
private const val TOPIC = "pet_commands"
private const val ALL_ACK = "all"
}
private final val producer: KafkaSender<String, String>
init {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = SERVER_CONFIG
props[ProducerConfig.CLIENT_ID_CONFIG] = PRODUCER_ID
props[ProducerConfig.ACKS_CONFIG] = ALL_ACK
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
producer = KafkaSender.create(SenderOptions.create(props))
}
override fun sendPetCreate(monoPet: Mono<Pet>): Mono<String> {
return monoPet.flatMap {
val id = UUID.randomUUID().toString()
producer.send(Flux.just(SenderRecord.create(ProducerRecord(TOPIC, id, id), id)))
.single().map { id }
}
}
}
package org.learning.by.example.petstore.petcommands.handlers
import org.learning.by.example.petstore.petcommands.model.ErrorResponse
import org.learning.by.example.petstore.petcommands.model.Pet
import org.learning.by.example.petstore.petcommands.model.Result
import org.learning.by.example.petstore.petcommands.service.PetCommands
import org.learning.by.example.petstore.reactor.dtovalidator.DTOValidator
import org.learning.by.example.petstore.reactor.dtovalidator.InvalidDtoException
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.body
import org.springframework.web.reactive.function.server.bodyToMono
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono
import java.net.URI
@Service
class PetHandler(
val dto: DTOValidator,
val petCommands: PetCommands
) {
companion object {
const val INVALID_RESOURCE = "Invalid Resource"
const val SERVER_ERROR = "Server Error"
}
private fun toResponse(id: String) = ServerResponse.created(URI.create("/pet/${id}"))
.contentType(MediaType.APPLICATION_JSON)
.body(Result(id).toMono())
private fun toError(throwable: Throwable) = if (throwable is InvalidDtoException) {
ServerResponse.status(HttpStatus.BAD_REQUEST)
.contentType(MediaType.APPLICATION_JSON)
.body(ErrorResponse(INVALID_RESOURCE, throwable.message!!).toMono())
} else {
ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.contentType(MediaType.APPLICATION_JSON)
.body(ErrorResponse(SERVER_ERROR, throwable.localizedMessage!!).toMono())
}
private fun validate(monoPet: Mono<Pet>) = dto.validate(monoPet)
fun postPet(serverRequest: ServerRequest) = serverRequest.bodyToMono<Pet>()
.transform(this::validate)
.transform(petCommands::sendPetCreate)
.flatMap(this::toResponse)
.onErrorResume(this::toError)
}
package org.learning.by.example.petstore.petcommands.handlers
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.DynamicTest
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestFactory
import org.junit.jupiter.api.extension.ExtendWith
import org.learning.by.example.petstore.petcommands.handlers.PetHandler.Companion.INVALID_RESOURCE
import org.learning.by.example.petstore.petcommands.model.ErrorResponse
import org.learning.by.example.petstore.petcommands.model.Result
import org.learning.by.example.petstore.petcommands.testing.verify
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.mock.http.server.reactive.MockServerHttpRequest
import org.springframework.mock.web.server.MockServerWebExchange
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.springframework.web.reactive.function.server.HandlerStrategies
import org.springframework.web.reactive.function.server.ServerRequest
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.test.StepVerifier
@ExtendWith(SpringExtension::class)
@SpringBootTest
class PetHandlerTest(@Autowired private val petHandler: PetHandler) {
companion object {
private const val PET_URL = "/pet"
private const val VALID_UUID = "[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}"
const val VALID_PET_URL = "$PET_URL/$VALID_UUID"
const val VALID_PET = """
{
"name": "fluffy",
"category": "dog",
"tags" : [
"soft",
"beauty",
"good-boy"
]
}
"""
private const val CLIENT_ID = "pet_commands_consumer"
private const val GROUP_ID = "pet_commands_consumers"
private const val OFFSET_EARLIEST = "earliest"
private const val SERVER_CONFIG = "localhost:9092"
private const val TOPIC = "pet_commands"
}
data class TestCase(val name: String, val parameters: Parameters, val expect: Expect) {
data class Parameters(val body: String)
data class Expect(val error: String)
}
@TestFactory
fun `We should get bad request when adding a pet with bad input`() = listOf(
TestCase(
name = "we should get a bad request when trying to add a pet with empty name",
parameters = TestCase.Parameters(
body = """
{
"name": "",
"category": "dog"
}
"""
),
expect = TestCase.Expect(
error = "Invalid name, size must be between 3 and 20."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a long name",
parameters = TestCase.Parameters(
body = """
{
"name": "supersupersuperfluffy",
"category": "dog"
}
"""
),
expect = TestCase.Expect(
error = "Invalid name, size must be between 3 and 20."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a bad name",
parameters = TestCase.Parameters(
body = """
{
"name": "super fluffy",
"category": "dog"
}
"""
),
expect = TestCase.Expect(
error = "Invalid name, should be alphanumeric."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with no name",
parameters = TestCase.Parameters(
body = """
{
"category": "dog"
}
"""
),
expect = TestCase.Expect(
error = "Invalid name, must not be null."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with empty category",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy1",
"category": ""
}
"""
),
expect = TestCase.Expect(
error = "Invalid category, size must be between 3 and 15."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a long category",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy2",
"category": "megamegamegamegadog"
}
"""
),
expect = TestCase.Expect(
error = "Invalid category, size must be between 3 and 15."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a bad category",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy",
"category": "dog1"
}
"""
),
expect = TestCase.Expect(
error = "Invalid category, should be only alphabetic characters."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with no category",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy"
}
"""
),
expect = TestCase.Expect(
error = "Invalid category, must not be null."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a invalid tags",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy",
"category" : "dog",
"tags" : [ "do" ]
}
"""
),
expect = TestCase.Expect(
error = "Invalid tags, each should be between 3 and 15 alphabetic characters or hyphen."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a valid & invalid tags",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy",
"category" : "dog",
"tags" : [ "beauty", "do" ]
}
"""
),
expect = TestCase.Expect(
error = "Invalid tags, each should be between 3 and 15 alphabetic characters or hyphen."
)
),
TestCase(
name = "we should get a bad request when trying to add a pet with a empty tag",
parameters = TestCase.Parameters(
body = """
{
"name": "fluffy",
"category" : "dog",
"tags" : [ "" ]
}
"""
),
expect = TestCase.Expect(
error = "Invalid tags, each should be between 3 and 15 alphabetic characters or hyphen."
)
)
).map {
DynamicTest.dynamicTest(it.name) {
val httpRequest = MockServerHttpRequest
.post("/pet")
.contentType(MediaType.APPLICATION_JSON)
.body(it.parameters.body)
val webExchange = MockServerWebExchange.from(httpRequest)
val request = ServerRequest.create(webExchange, HandlerStrategies.withDefaults().messageReaders())
petHandler.postPet(request).verify { response, result: ErrorResponse ->
assertThat(response.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST)
assertThat(response.headers().contentType).isEqualTo(MediaType.APPLICATION_JSON)
assertThat(response.headers().location).isNull()
assertThat(result.message).isEqualTo(INVALID_RESOURCE)
assertThat(result.description).isEqualTo(it.expect.error)
}
}
}
@Test
fun `we should get the result and headers when adding a pet`() {
// we may need to change this if this issue isn't fix :
// https://github.com/spring-projects/spring-framework/issues/25087
val httpRequest = MockServerHttpRequest
.post("/pet")
.contentType(MediaType.APPLICATION_JSON)
.body(VALID_PET)
val webExchange = MockServerWebExchange.from(httpRequest)
val request = ServerRequest.create(webExchange, HandlerStrategies.withDefaults().messageReaders())
petHandler.postPet(request).verify { response, result: Result ->
assertThat(response.statusCode()).isEqualTo(HttpStatus.CREATED)
assertThat(response.headers().location.toString()).matches(VALID_PET_URL)
assertThat(response.headers().contentType).isEqualTo(MediaType.APPLICATION_JSON)
assertThat(result.id).matches(VALID_UUID)
}
}
@Test
fun `we should get a pet create command sent when posting a pet`() {
// we may need to change this if this issue isn't fix :
// https://github.com/spring-projects/spring-framework/issues/25087
val httpRequest = MockServerHttpRequest
.post("/pet")
.contentType(MediaType.APPLICATION_JSON)
.body(VALID_PET)
val webExchange = MockServerWebExchange.from(httpRequest)
val request = ServerRequest.create(webExchange, HandlerStrategies.withDefaults().messageReaders())
var id = ""
petHandler.postPet(request).verify { response, result: Result ->
assertThat(response.statusCode()).isEqualTo(HttpStatus.CREATED)
assertThat(response.headers().location.toString()).matches(VALID_PET_URL)
assertThat(response.headers().contentType).isEqualTo(MediaType.APPLICATION_JSON)
id = result.id
assertThat(result.id).matches(VALID_UUID)
StepVerifier.create(getStrings())
.expectSubscription()
.thenRequest(Long.MAX_VALUE)
.expectNext(id)
.expectNextCount(0L)
.thenCancel()
.verify()
}
}
fun getStrings() = getKafkaReceiver().receive().map {
val receiverOffset = it.receiverOffset()
receiverOffset.acknowledge()
it.value()
}
private fun getKafkaReceiver(): KafkaReceiver<String, String> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = SERVER_CONFIG
props[ConsumerConfig.CLIENT_ID_CONFIG] = CLIENT_ID
props[ConsumerConfig.GROUP_ID_CONFIG] = GROUP_ID
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = OFFSET_EARLIEST
return KafkaReceiver.create(ReceiverOptions.create<String, String>(props).subscription(setOf(TOPIC)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment