Last active
September 21, 2025 11:30
-
-
Save fResult/bf98b1fc574c7bf452d2a8c48f6b95ba to your computer and use it in GitHub Desktop.
Git diff (source/main…HEAD) capturing my personal code notes for Medium post“Personal Notes: Documenting PR Changes with AI (no Agent mode version).”
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/README.md b/README.md | |
| index ced9bd0..d30d6ec 100644 | |
| --- a/README.md | |
| +++ b/README.md | |
| @@ -53,6 +53,9 @@ Each module is implemented in both [Java](./java) and [Kotlin](./kotlin) to comp | |
| - Implemented parallel Kotlin versions of Java examples | |
| - Updated to Spring Boot 3.5.x while the book uses an older Spring Boot version 2.5.0 | |
| - Adopted a monorepo approach with [Gradle Multi-project Builds][gradle-multiproject] and [Gradle Composite Builds][gradle-composite-builds] to manage both Java and Kotlin implementations in a single repository | |
| + - Learned that Gradle Kotlin DSL supports both type-safe and string-based dependency declarations, and that the string-based form was required | |
| + in [`08:rsocket/build.gradle.kts`](./kotlin/08-rsocket/build.gradle.kts) when adding dependencies inside `afterEvaluate` | |
| + - Learned that some plugins (e.g., `spring-boot`, `dependency-management`) must be applied with `apply(false)` in the root and then enabled in subprojects, since they cannot be declared directly like `kotlin("jvm")`. | |
| - Implemented database profile switching between R2DBC, MongoDB, and other providers | |
| - Implemented reactive global error handling in [`ExceptionProblemResponseMapper#map`](https://github.com/fResult/Learn-Spring-Webflux-3.0/blob/72805b595fe7e3b692d7ccce6d78d2611b40abd3/kotlin/07-http/webflux/src/main/kotlin/com/fResult/common/ExceptionProblemResponseMapper.kt#L13-L21) | |
| (utilized in [`ErrorHandlingRouteConfiguration`](https://github.com/fResult/Learn-Spring-Webflux-3.0/blob/72805b5/kotlin/07-http/webflux/src/main/kotlin/com/fResult/http/filters/ErrorHandlingRouteConfiguration.kt#L18) class). | |
| diff --git a/kotlin/08-rsocket/build.gradle.kts b/kotlin/08-rsocket/build.gradle.kts | |
| index 0ef8444..4215181 100644 | |
| --- a/kotlin/08-rsocket/build.gradle.kts | |
| +++ b/kotlin/08-rsocket/build.gradle.kts | |
| @@ -34,6 +34,7 @@ subprojects { | |
| afterEvaluate { | |
| dependencies { | |
| "implementation"(libs.spring.boot.starter.rsocket) | |
| + "implementation"("com.fasterxml.jackson.module:jackson-module-kotlin") | |
| "implementation"("io.projectreactor.kotlin:reactor-kotlin-extensions") | |
| "implementation"("org.jetbrains.kotlin:kotlin-reflect") | |
| "implementation"("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") | |
| diff --git a/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/EncodingUtils.kt b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/EncodingUtils.kt | |
| index b0eb05b..42984eb 100644 | |
| --- a/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/EncodingUtils.kt | |
| +++ b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/EncodingUtils.kt | |
| @@ -11,8 +11,8 @@ class EncodingUtils(private val objectMapper: ObjectMapper) { | |
| private val logger = LogManager.getLogger() | |
| } | |
| - private val objectReader = objectMapper.readerFor(Any::class.java) | |
| private val typeReference = typeRef<Map<String, Any>>() | |
| + private val objectReader = objectMapper.readerFor(typeReference) | |
| fun <T : Any> decode(json: String, klass: KClass<T>): T = | |
| runCatching { objectMapper.readValue(json, klass.java) } | |
| @@ -35,6 +35,7 @@ class EncodingUtils(private val objectMapper: ObjectMapper) { | |
| logger.error("Failed to encode object of type ${obj::class.simpleName}", ex) | |
| throw EncodingException("Unable to encode ${obj::class.simpleName}", ex) | |
| } | |
| + | |
| else -> throw ex | |
| } | |
| } | |
| diff --git a/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/FResultAutoConfiguration.kt b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/FResultAutoConfiguration.kt | |
| index 605dac5..406f700 100644 | |
| --- a/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/FResultAutoConfiguration.kt | |
| +++ b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/FResultAutoConfiguration.kt | |
| @@ -10,7 +10,4 @@ import org.springframework.context.annotation.Configuration | |
| class FResultAutoConfiguration { | |
| @Bean | |
| fun encodingUtils(objectMapper: ObjectMapper) = EncodingUtils(objectMapper) | |
| - | |
| - @Bean | |
| - fun objectMapper() = ObjectMapper() | |
| } | |
| diff --git a/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/dsl/retry/RetryConfig.kt b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/dsl/retry/RetryConfig.kt | |
| new file mode 100644 | |
| index 0000000..3158d7a | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/dsl/retry/RetryConfig.kt | |
| @@ -0,0 +1,9 @@ | |
| +package com.fResult.rsocket.dsl.retry | |
| + | |
| +import java.time.Duration | |
| + | |
| +data class RetryConfig( | |
| + val maxAttempts: Long, | |
| + val firstBackOff: Duration, | |
| + val maxBackoff: Duration, | |
| +) | |
| diff --git a/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/dsl/retry/RetryConfigBuilder.kt b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/dsl/retry/RetryConfigBuilder.kt | |
| new file mode 100644 | |
| index 0000000..c331d26 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/common/src/main/kotlin/com/fResult/rsocket/dsl/retry/RetryConfigBuilder.kt | |
| @@ -0,0 +1,17 @@ | |
| +package com.fResult.rsocket.dsl.retry | |
| + | |
| +import kotlin.time.Duration | |
| +import kotlin.time.Duration.Companion.seconds | |
| +import kotlin.time.toJavaDuration | |
| + | |
| +class RetryConfigBuilder { | |
| + var maxAttempts: Int = 5 | |
| + var firstBackOff: Duration = 1.seconds | |
| + var maxBackoff: Duration = 32.seconds | |
| + | |
| + fun build() = RetryConfig( | |
| + maxAttempts = maxAttempts.toLong(), | |
| + firstBackOff = firstBackOff.toJavaDuration(), | |
| + maxBackoff = maxBackoff.toJavaDuration(), | |
| + ) | |
| +} | |
| \ No newline at end of file | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/build.gradle.kts b/kotlin/08-rsocket/raw-rsocket/build.gradle.kts | |
| index 92eeeb9..0542552 100644 | |
| --- a/kotlin/08-rsocket/raw-rsocket/build.gradle.kts | |
| +++ b/kotlin/08-rsocket/raw-rsocket/build.gradle.kts | |
| @@ -2,6 +2,9 @@ import org.springframework.boot.gradle.tasks.run.BootRun | |
| dependencies { | |
| implementation(project(":common")) | |
| + | |
| + // MacOS ARM (Apple Silicon) DNS resolver | |
| + runtimeOnly("io.netty:netty-resolver-dns-native-macos:4.1.89.Final:osx-aarch_64") | |
| } | |
| /* ================================ * | |
| @@ -54,3 +57,20 @@ tasks.register<BootRun>("bootChannelClient") { | |
| mainClass = "com.fResult.rsocket.channel.client.ChannelApplicationKt" | |
| classpath = sourceSets["main"].runtimeClasspath | |
| } | |
| + | |
| +/* ==================================== * | |
| + * ========= Bidirectional ============ * | |
| + * ==================================== */ | |
| +tasks.register<BootRun>("bootBidirectionalService") { | |
| + group = "application" | |
| + description = "Run the RSocket Bidirectional Server" | |
| + mainClass = "com.fResult.rsocket.bidirectional.service.BidirectionalApplicationKt" | |
| + classpath = sourceSets["main"].runtimeClasspath | |
| +} | |
| + | |
| +tasks.register<BootRun>("bootBidirectionalClient") { | |
| + group = "application" | |
| + description = "Run the RSocket Bidirectional Client" | |
| + mainClass = "com.fResult.rsocket.bidirectional.client.BidirectionalApplicationKt" | |
| + classpath = sourceSets["main"].runtimeClasspath | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/ClientHealthState.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/ClientHealthState.kt | |
| new file mode 100644 | |
| index 0000000..116773d | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/ClientHealthState.kt | |
| @@ -0,0 +1,8 @@ | |
| +package com.fResult.rsocket.bidirectional | |
| + | |
| +class ClientHealthState(val state: String) { | |
| + companion object { | |
| + const val STARTED = "started" | |
| + const val STOPPED = "stopped" | |
| + } | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/GreetingRequest.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/GreetingRequest.kt | |
| new file mode 100644 | |
| index 0000000..8262168 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/GreetingRequest.kt | |
| @@ -0,0 +1,3 @@ | |
| +package com.fResult.rsocket.bidirectional | |
| + | |
| +data class GreetingRequest(val name: String) | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/GreetingResponse.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/GreetingResponse.kt | |
| new file mode 100644 | |
| index 0000000..9443dc9 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/GreetingResponse.kt | |
| @@ -0,0 +1,3 @@ | |
| +package com.fResult.rsocket.bidirectional | |
| + | |
| +data class GreetingResponse(val message: String) | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalApplication.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalApplication.kt | |
| new file mode 100644 | |
| index 0000000..ccefbe7 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalApplication.kt | |
| @@ -0,0 +1,13 @@ | |
| +package com.fResult.rsocket.bidirectional.client | |
| + | |
| +import org.springframework.boot.autoconfigure.SpringBootApplication | |
| +import org.springframework.boot.runApplication | |
| + | |
| +@SpringBootApplication | |
| +class BidirectionalApplication | |
| + | |
| +@Throws(InterruptedException::class) | |
| +fun main(args: Array<String>) { | |
| + runApplication<BidirectionalApplication>(*args) | |
| + Thread.currentThread().join() | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalClient.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalClient.kt | |
| new file mode 100644 | |
| index 0000000..0b66f82 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalClient.kt | |
| @@ -0,0 +1,79 @@ | |
| +package com.fResult.rsocket.bidirectional.client | |
| + | |
| +import com.fResult.rsocket.EncodingUtils | |
| +import com.fResult.rsocket.bidirectional.ClientHealthState | |
| +import com.fResult.rsocket.bidirectional.GreetingRequest | |
| +import com.fResult.rsocket.bidirectional.GreetingResponse | |
| +import io.rsocket.ConnectionSetupPayload | |
| +import io.rsocket.Payload | |
| +import io.rsocket.RSocket | |
| +import io.rsocket.core.RSocketConnector | |
| +import io.rsocket.transport.netty.client.TcpClientTransport | |
| +import io.rsocket.util.DefaultPayload | |
| +import org.apache.logging.log4j.LogManager | |
| +import org.apache.logging.log4j.Logger | |
| +import reactor.core.publisher.Flux | |
| +import reactor.core.publisher.Mono | |
| +import java.time.Instant | |
| +import java.util.stream.Stream | |
| +import kotlin.time.Duration.Companion.seconds | |
| +import kotlin.time.toJavaDuration | |
| + | |
| +class BidirectionalClient( | |
| + private val encodingUtils: EncodingUtils, | |
| + private val uid: String, | |
| + private val serviceHostname: String, | |
| + private val servicePort: Int, | |
| +) { | |
| + companion object { | |
| + val log: Logger = LogManager.getLogger(BidirectionalClient::class.java) | |
| + } | |
| + | |
| + fun getGreetings(): Flux<GreetingResponse?> { | |
| + val greetingRequestPayload = encodingUtils.encode(GreetingRequest("Client #$uid")) | |
| + | |
| + return RSocketConnector.create() | |
| + .acceptor(::acceptor) | |
| + .connect(TcpClientTransport.create(serviceHostname, servicePort)) | |
| + .flatMapMany(streamGreetingResponses(greetingRequestPayload)) | |
| + } | |
| + | |
| + private fun streamGreetingResponses(requestPayload: String): (RSocket) -> Flux<GreetingResponse> = | |
| + { socket -> | |
| + socket.requestStream(DefaultPayload.create(requestPayload)) | |
| + .doOnNext(::logReceivedResponse) | |
| + .map(::toGreetingResponse) | |
| + } | |
| + | |
| + private fun acceptor(setup: ConnectionSetupPayload, serverRSocket: RSocket): Mono<RSocket> = | |
| + Mono.just(createRequestStreamHandler()) | |
| + | |
| + private fun createRequestStreamHandler(): RSocket = | |
| + object : RSocket { | |
| + override fun requestStream(payload: Payload): Flux<Payload> { | |
| + val start = Instant.now().toEpochMilli() | |
| + val delayMillis = (0..30_000).random().toLong() | |
| + | |
| + val stateFlux = Flux.fromStream(Stream.generate(nextClientHealthState(start, delayMillis))) | |
| + .delayElements(5.seconds.toJavaDuration()) | |
| + | |
| + return stateFlux | |
| + .map(encodingUtils::encode) | |
| + .map(DefaultPayload::create) | |
| + } | |
| + } | |
| + | |
| + fun nextClientHealthState(start: Long, delayMillis: Long): () -> ClientHealthState = { | |
| + val now = Instant.now().toEpochMilli() | |
| + val stop = ((start + delayMillis) < now) && Math.random() > .8 | |
| + | |
| + ClientHealthState(if (stop) "STOPPED" else "STARTED") | |
| + } | |
| + | |
| + fun logReceivedResponse(payload: Payload) { | |
| + log.info("Received response data: {}", payload.dataUtf8) | |
| + } | |
| + | |
| + fun toGreetingResponse(payload: Payload): GreetingResponse = | |
| + encodingUtils.decode(payload.dataUtf8, GreetingResponse::class) | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalClientLauncher.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalClientLauncher.kt | |
| new file mode 100644 | |
| index 0000000..09fb3ef | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/client/BidirectionalClientLauncher.kt | |
| @@ -0,0 +1,78 @@ | |
| +package com.fResult.rsocket.bidirectional.client | |
| + | |
| +import com.fResult.rsocket.EncodingUtils | |
| +import com.fResult.rsocket.FResultProperties | |
| +import com.fResult.rsocket.bidirectional.GreetingResponse | |
| +import com.fResult.rsocket.dsl.retry.RetryConfigBuilder | |
| +import org.apache.logging.log4j.LogManager | |
| +import org.apache.logging.log4j.Logger | |
| +import org.springframework.boot.context.event.ApplicationReadyEvent | |
| +import org.springframework.context.event.EventListener | |
| +import org.springframework.stereotype.Component | |
| +import reactor.core.publisher.Flux | |
| +import reactor.util.retry.Retry | |
| +import reactor.util.retry.RetryBackoffSpec | |
| +import java.nio.channels.ClosedChannelException | |
| +import java.util.stream.IntStream | |
| +import kotlin.time.Duration.Companion.seconds | |
| +import kotlin.time.toJavaDuration | |
| + | |
| +@Component | |
| +class BidirectionalClientLauncher( | |
| + private val props: FResultProperties, | |
| + private val encodingUtils: EncodingUtils, | |
| +) { | |
| + companion object { | |
| + val log: Logger = LogManager.getLogger(BidirectionalClientLauncher::class.java) | |
| + } | |
| + | |
| + @EventListener(ApplicationReadyEvent::class) | |
| + fun onApplicationReady() { | |
| + val maxClients = (5..10).random() | |
| + val hostname = props.rsocket.hostname | |
| + val port = props.rsocket.port | |
| + log.info("Launching {} clients connecting to {}:{}", maxClients, hostname, port) | |
| + | |
| + Flux.fromStream(IntStream.range(0, maxClients).boxed()) | |
| + .map(buildBidirectionalClient(encodingUtils, hostname, port)) | |
| + .flatMap(::toDelayClient) | |
| + .flatMap(BidirectionalClient::getGreetings) | |
| + .retryWhen(retryBackoffOnClosedChannel { maxAttempts = 5 }) | |
| + .subscribe( | |
| + ::onGreetingReceived, | |
| + ::onGreetingError, | |
| + ::onGreetingComplete, | |
| + ) | |
| + } | |
| + | |
| + private fun buildBidirectionalClient( | |
| + encodingUtils: EncodingUtils, | |
| + host: String, | |
| + port: Int | |
| + ): (Int) -> BidirectionalClient = { id -> | |
| + BidirectionalClient(encodingUtils, id.toString(), host, port) | |
| + } | |
| + | |
| + private fun toDelayClient(client: BidirectionalClient): Flux<BidirectionalClient> = | |
| + Flux.just(client).delayElements((1..30).random().seconds.toJavaDuration()) | |
| + | |
| + private fun retryBackoffOnClosedChannel(init: RetryConfigBuilder.() -> Unit): RetryBackoffSpec { | |
| + val cfg = RetryConfigBuilder().apply(init).build() | |
| + | |
| + return Retry.backoff(cfg.maxAttempts, cfg.firstBackOff) | |
| + .maxBackoff(cfg.maxBackoff) | |
| + .filter { it is ClosedChannelException } | |
| + .jitter(0.2) | |
| + .onRetryExhaustedThrow { _, _ -> RuntimeException("Retries exhausted") } | |
| + } | |
| + | |
| + private fun onGreetingReceived(greeting: GreetingResponse?): Unit { | |
| + greeting?.apply { log.info(message) } | |
| + ?: log.warn("Received null GreetingResponse") | |
| + } | |
| + | |
| + private fun onGreetingError(ex: Throwable): Unit = | |
| + log.error("Client stream failed with error: ${ex.message}", ex) | |
| + | |
| + private fun onGreetingComplete(): Unit = log.info("Client greeting streams completed") | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/service/BidirectionalApplication.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/service/BidirectionalApplication.kt | |
| new file mode 100644 | |
| index 0000000..71971d8 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/service/BidirectionalApplication.kt | |
| @@ -0,0 +1,13 @@ | |
| +package com.fResult.rsocket.bidirectional.service | |
| + | |
| +import org.springframework.boot.autoconfigure.SpringBootApplication | |
| +import org.springframework.boot.runApplication | |
| + | |
| +@SpringBootApplication | |
| +class BidirectionalApplication | |
| + | |
| +@Throws(InterruptedException::class) | |
| +fun main(args: Array<String>) { | |
| + runApplication<BidirectionalApplication>(*args) | |
| + Thread.currentThread().join() | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/service/BidirectionalService.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/service/BidirectionalService.kt | |
| new file mode 100644 | |
| index 0000000..1b281f5 | |
| --- /dev/null | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/bidirectional/service/BidirectionalService.kt | |
| @@ -0,0 +1,84 @@ | |
| +package com.fResult.rsocket.bidirectional.service | |
| + | |
| +import com.fResult.rsocket.EncodingUtils | |
| +import com.fResult.rsocket.FResultProperties | |
| +import com.fResult.rsocket.bidirectional.ClientHealthState | |
| +import com.fResult.rsocket.bidirectional.GreetingRequest | |
| +import com.fResult.rsocket.bidirectional.GreetingResponse | |
| +import io.rsocket.ConnectionSetupPayload | |
| +import io.rsocket.Payload | |
| +import io.rsocket.RSocket | |
| +import io.rsocket.core.RSocketServer | |
| +import io.rsocket.transport.netty.server.CloseableChannel | |
| +import io.rsocket.transport.netty.server.TcpServerTransport | |
| +import io.rsocket.util.DefaultPayload | |
| +import org.apache.logging.log4j.LogManager | |
| +import org.apache.logging.log4j.Logger | |
| +import org.springframework.boot.context.event.ApplicationReadyEvent | |
| +import org.springframework.context.event.EventListener | |
| +import org.springframework.stereotype.Component | |
| +import reactor.core.publisher.Flux | |
| +import reactor.core.publisher.Mono | |
| +import java.time.Instant | |
| +import java.util.stream.Stream | |
| +import kotlin.reflect.KClass | |
| +import kotlin.time.Duration.Companion.seconds | |
| +import kotlin.time.toJavaDuration | |
| + | |
| +@Component | |
| +class BidirectionalService( | |
| + private val props: FResultProperties, | |
| + private val encodingUtils: EncodingUtils, | |
| +) { | |
| + companion object { | |
| + val log: Logger = LogManager.getLogger(BidirectionalService::class.java) | |
| + } | |
| + | |
| + @EventListener(ApplicationReadyEvent::class) | |
| + fun onApplicationReady() { | |
| + val serverTransport = TcpServerTransport.create(props.rsocket.hostname, props.rsocket.port) | |
| + val socketServer = RSocketServer.create(::socketAcceptor) | |
| + | |
| + socketServer.bind(serverTransport).doOnNext(::logStartup).block() | |
| + } | |
| + | |
| + private fun socketAcceptor(setup: ConnectionSetupPayload, sendingSocket: RSocket) = | |
| + Mono.just(createRequestStreamHandler(sendingSocket)) | |
| + | |
| + private fun createRequestStreamHandler(clientRSocket: RSocket): RSocket { | |
| + return object : RSocket { | |
| + override fun requestStream(payload: Payload): Flux<Payload> = | |
| + streamUntilClientStop(clientRSocket, payload) | |
| + } | |
| + } | |
| + | |
| + private fun streamUntilClientStop(clientRSocket: RSocket, payload: Payload): Flux<Payload> { | |
| + val onClientStopped = clientRSocket.requestStream(DefaultPayload.create(ByteArray(0))) | |
| + .map(decodePayloadAs(ClientHealthState::class)) | |
| + .filter(::isClientHealthStateStopped) | |
| + | |
| + val greetingRequest = payload.let(decodePayloadAs(GreetingRequest::class)) | |
| + | |
| + return Flux.fromStream(Stream.generate(greetingResponderFrom(greetingRequest))) | |
| + .delayElements(randomDelayUpTo10Seconds()) | |
| + .takeUntilOther(onClientStopped) | |
| + .map(encodingUtils::encode) | |
| + .map(DefaultPayload::create) | |
| + .doFinally { signalType -> log.info("Finished greeting to ${greetingRequest.name}.") } | |
| + } | |
| + | |
| + private fun <T : Any> decodePayloadAs(klass: KClass<T>): (Payload) -> T = | |
| + { payload -> encodingUtils.decode(payload.dataUtf8, klass) } | |
| + | |
| + private fun isClientHealthStateStopped(chs: ClientHealthState) = | |
| + ClientHealthState.STOPPED.equals(chs.state, ignoreCase = true) | |
| + | |
| + private fun randomDelayUpTo10Seconds() = (3..10).random().seconds.toJavaDuration() | |
| + | |
| + private fun greetingResponderFrom(request: GreetingRequest): () -> GreetingResponse = { | |
| + GreetingResponse("Hello, ${request.name} @ ${Instant.now()}") | |
| + } | |
| + | |
| + private fun logStartup(channel: CloseableChannel): Unit = | |
| + log.info("Server started on the address: {}", channel.address()) | |
| +} | |
| diff --git a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/fireAndForget/client/FireAndForgetClient.kt b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/fireAndForget/client/FireAndForgetClient.kt | |
| index dcba36e..e23ac9c 100644 | |
| --- a/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/fireAndForget/client/FireAndForgetClient.kt | |
| +++ b/kotlin/08-rsocket/raw-rsocket/src/main/kotlin/com/fResult/rsocket/fireAndForget/client/FireAndForgetClient.kt | |
| @@ -1,7 +1,6 @@ | |
| package com.fResult.rsocket.fireAndForget.client | |
| import com.fResult.rsocket.FResultProperties | |
| -import io.rsocket.Payload | |
| import io.rsocket.core.RSocketClient | |
| import io.rsocket.core.RSocketConnector | |
| import io.rsocket.transport.netty.client.TcpClientTransport |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment