Panduan belajar Reactive Programming dengan contoh-contoh nyata dari codebase CIAM Service.
- Konsep Dasar
- Mono vs Flux
- Operators yang Sering Dipakai
- Error Handling
- Combining Multiple Calls
- Performance Monitoring
- Best Practices
- Contoh dari Codebase
Blocking (Traditional):
// Thread MENUNGGU sampai selesai
String result = database.query("SELECT ..."); // ❌ Thread BLOCKED
processResult(result);Non-Blocking (Reactive):
// Thread TIDAK menunggu, lanjut kerja yang lain
Mono<String> result = database.query("SELECT ..."); // ✅ Thread BEBAS
result.subscribe(data -> processResult(data));✅ High Throughput - Bisa handle banyak request dengan thread sedikit ✅ Non-Blocking I/O - Thread tidak idle saat tunggu database/network ✅ Backpressure - Auto handle slow consumers ✅ Composable - Chain operations dengan mudah
Dipakai untuk operasi yang return single value atau empty:
// Return 1 device
Mono<Device> device = deviceService.findById("abc123");
// Return 1 string
Mono<String> deviceId = deviceIdUtil.decryptDeviceId(encrypted);
// Return 1 boolean
Mono<Boolean> isEnabled = cachePort.getEnableBurekolParam();Kapan pakai Mono?
- Get by ID
- Count
- Exists check
- Any operation yang return single result
Dipakai untuk operasi yang return multiple values:
// Return list of devices
Flux<Device> devices = deviceService.findAll();
// Return list of users
Flux<User> users = userRepository.findByStatus("ACTIVE");
// Stream of events
Flux<String> keys = redisTemplate.keys("user:*");Kapan pakai Flux?
- Find all / list
- Stream processing
- Multiple results
// Contoh dari MetadataExtractor.java
Mono<String> deviceIdMono = deviceIdUtil.decryptDeviceId(encryptedDeviceId);
// Transform String → Metadata
Mono<Metadata> metadata = deviceIdMono.map(decryptedDeviceId ->
Metadata.builder()
.deviceId(decryptedDeviceId)
.os("ANDROID")
.build()
);Gunakan .map() untuk:
- Transform tipe data (String → Integer)
- Extract field dari object
- Buat object baru dari existing data
// Contoh dari DeviceController.java
return metadataExtractor.extract(exchange)
.flatMap(metadata -> {
String deviceId = metadata.getDeviceId();
// Panggil service lain (return Mono)
return deviceUseCase.checkVersionBinding(deviceId);
});Perbedaan .map() vs .flatMap():
// ❌ SALAH - .map() return Mono<Mono<Device>>
.map(metadata -> deviceUseCase.checkVersionBinding(metadata.getDeviceId()))
// ✅ BENAR - .flatMap() return Mono<Device>
.flatMap(metadata -> deviceUseCase.checkVersionBinding(metadata.getDeviceId()))Gunakan .flatMap() untuk:
- Chain reactive calls
- Call service yang return Mono/Flux
- Nested async operations
// Contoh dari DeviceService.java
return userManagementClientPort.loadGetDeviceBindingByDeviceId(deviceId, true)
.filter(Device::hasCheckVersionBindingData) // Hanya lewat kalau true
.switchIfEmpty(Mono.error(new IllegalStateException("Data not found")));Gunakan .filter() untuk:
- Validasi kondisi
- Skip elements yang tidak memenuhi syarat
// Filter dengan async validation
return deviceMono
.filterWhen(device ->
permissionService.hasAccess(device.getUserId()) // Return Mono<Boolean>
);// Contoh dari DeviceService.java
return cachePort.loadGetEnableBurekolParam()
.defaultIfEmpty(Boolean.FALSE); // Kalau Redis kosong, return false// Coba dari cache dulu, kalau kosong ambil dari DB
return cachePort.getValue("key")
.switchIfEmpty(
databasePort.getValue("key") // Fallback ke DB
.doOnNext(value -> cachePort.setValue("key", value)) // Save ke cache
);// Contoh dari DeviceRedisAdapter.java
return getHashValue(SYS_PARAM_KEY, ENABLE_BUREKOL_FIELD)
.map(value -> "true".equalsIgnoreCase(value.trim()))
.onErrorReturn(Boolean.FALSE); // Kalau error, return false// Try primary service, fallback to backup
return primaryService.getData()
.onErrorResume(error -> {
log.warn("Primary failed, using backup");
return backupService.getData();
});// Contoh dari RedisCommonAdapter.java
return reactiveRedisTemplate.opsForHash()
.get(hashKey, field)
.doOnError(error ->
log.error("[{}] Error getting hash", databaseName, error)
);return deviceService.save(device)
.doOnNext(saved ->
log.info("Device saved: {}", saved.getId())
);return deviceService.findById(id)
.doOnSuccess(device -> {
if (device != null) {
log.info("Found: {}", device.getId());
} else {
log.info("Not found");
}
});return riskyOperation()
.doOnError(error ->
log.error("Operation failed", error)
);return longRunningTask()
.doOnSubscribe(s ->
log.info("Starting long task...")
)
.doOnNext(result ->
log.info("Task completed!")
);// Contoh dari DeviceController.java
return Mono.zip(
Mono.just(device),
deviceUseCase.getEnableBurekolParam(), // Call 1 - parallel
deviceUseCase.getEnableNtbParam(), // Call 2 - parallel
deviceUseCase.isBlockedByDebitCardPin(cif), // Call 3 - parallel
deviceUseCase.checkPlatformVersion(version) // Call 4 - parallel
).map(tuple -> {
Device dev = tuple.getT1();
Boolean burekol = tuple.getT2();
Boolean ntb = tuple.getT3();
Boolean blocked = tuple.getT4();
String platform = tuple.getT5();
return DeviceMapper.toResponse(dev, burekol, ntb, blocked, platform);
});Benefits Mono.zip():
- ✅ Eksekusi PARALLEL (tidak sequential)
- ✅ Tunggu semua selesai baru lanjut
- ✅ Kalau ada 1 error, semua fail
Sequential vs Parallel:
// ❌ SEQUENTIAL - Total: 300ms (100+100+100)
return call1() // 100ms
.flatMap(r1 -> call2()) // 100ms
.flatMap(r2 -> call3()); // 100ms
// ✅ PARALLEL - Total: 100ms (semua jalan bareng)
return Mono.zip(
call1(), // 100ms
call2(), // 100ms } Jalan bareng!
call3() // 100ms
);Flux<String> source1 = Flux.just("A", "B");
Flux<String> source2 = Flux.just("C", "D");
Flux<String> merged = Flux.merge(source1, source2);
// Output: A, B, C, D (atau C, D, A, B - tidak guaranteed order)Flux<String> first = Flux.just("1", "2");
Flux<String> second = Flux.just("3", "4");
Flux<String> concatenated = Flux.concat(first, second);
// Output: 1, 2, 3, 4 (guaranteed order)// Contoh dari ControllerLoggingFilter.java
return chain.filter(exchange)
.elapsed() // Returns Tuple2<Long, Void>
.doOnNext(tuple -> {
long durationMs = tuple.getT1(); // Duration in milliseconds
log.info("Request took: {}ms", durationMs);
})
.then(); // Convert back to Mono<Void>Common pattern:
return someOperation()
.elapsed()
.doOnNext(tuple -> {
long duration = tuple.getT1();
String result = tuple.getT2();
log.info("Operation took {}ms, result: {}", duration, result);
})
.map(tuple -> tuple.getT2()); // Extract original value// File: DeviceRedisAdapter.java
public Mono<Boolean> loadGetEnableBurekolParam() {
return getHashValue(SYS_PARAM_KEY, ENABLE_BUREKOL_FIELD)
.map(value -> "true".equalsIgnoreCase(value.trim()))
.onErrorReturn(Boolean.FALSE);
}Penjelasan:
- Get value dari Redis
- Transform String → Boolean dengan
.map() - Kalau error (Redis down), return
false
// File: DeviceController.java
return metadataExtractor.extract(exchange)
.flatMap(metadata -> {
String deviceId = metadata.getDeviceId();
// Call service (return Mono<Device>)
return deviceUseCase.checkVersionBinding(deviceId)
.flatMap(device -> {
String cif = device.getCif();
// Parallel calls
return Mono.zip(
Mono.just(device),
deviceUseCase.getEnableBurekolParam(),
deviceUseCase.isBlockedByDebitCardPin(cif)
);
})
.map(tuple -> DeviceMapper.toResponse(
metadata,
tuple.getT1(),
tuple.getT2(),
tuple.getT3()
));
});Flow:
- Extract metadata (async)
- Check version binding (async)
- Get 3 values in parallel (async)
- Map to response
// File: DeviceService.java
public Mono<Device> checkVersionBinding(String deviceId) {
return userManagementClientPort.loadGetDeviceBindingByDeviceId(deviceId, true)
.filter(Device::hasCheckVersionBindingData)
.switchIfEmpty(
Mono.error(new IllegalStateException("Device binding data not found"))
);
}Penjelasan:
- Load dari user management service
- Filter: hanya lewat kalau ada data
- Kalau empty → throw error
// File: RedisCommonAdapter.java
public Mono<Map<String, String>> getAllHashEntries(String hashKey) {
return reactiveRedisTemplate.opsForHash()
.entries(hashKey)
.collectMap(
entry -> entry.getKey().toString().trim(),
entry -> entry.getValue().toString().trim()
)
.doOnError(error ->
log.error("[{}] Error getting hash entries - key: {}",
databaseName, hashKey, error)
)
.onErrorReturn(Map.of()); // Return empty map on error
}Penjelasan:
- Get all hash entries dari Redis
- Collect ke Map
- Kalau error: log dulu, terus return empty map
- Application tetap jalan (graceful degradation)
// File: MetadataExtractor.java
Mono<String> deviceIdMono = StringUtils.hasText(encryptedDeviceId)
? deviceIdUtil.decryptDeviceId(encryptedDeviceId)
.onErrorReturn("")
: Mono.just("");Penjelasan:
- Kalau ada encrypted deviceId → decrypt (async)
- Kalau decrypt error → return empty string
- Kalau tidak ada → langsung return empty string
// ❌ JANGAN LAKUKAN INI!
Mono<String> mono = someService.getData();
String result = mono.block(); // BLOCKING! Thread tunggu!
// ✅ LAKUKAN INI
return someService.getData()
.map(result -> processResult(result));// ❌ SALAH
.map(user -> userService.getProfile(user.getId())) // Return Mono<Mono<Profile>>
// ✅ BENAR
.flatMap(user -> userService.getProfile(user.getId())) // Return Mono<Profile>// ✅ GOOD - Ada fallback
return primaryService.getData()
.onErrorResume(error -> {
log.warn("Primary failed, using cache");
return cacheService.getData();
})
.onErrorReturn(defaultValue); // Last resort// ❌ SEQUENTIAL (lambat)
return service1.call()
.flatMap(r1 -> service2.call())
.flatMap(r2 -> service3.call());
// ✅ PARALLEL (cepat)
return Mono.zip(
service1.call(),
service2.call(),
service3.call()
).map(tuple -> combine(tuple.getT1(), tuple.getT2(), tuple.getT3()));return someOperation()
.doOnSubscribe(s -> log.debug("Starting operation..."))
.doOnNext(result -> log.debug("Got result: {}", result))
.doOnError(error -> log.error("Failed", error))
.doOnSuccess(result -> log.debug("Completed!"));// ✅ Provide defaults
return repository.findById(id)
.switchIfEmpty(Mono.error(new NotFoundException()))
// atau
.defaultIfEmpty(defaultValue);// ❌ JANGAN SUBSCRIBE DI SERVICE!
public void saveUser(User user) {
userRepository.save(user)
.subscribe(); // ❌ Lost error handling!
}
// ✅ RETURN MONO
public Mono<User> saveUser(User user) {
return userRepository.save(user); // ✅ Let controller handle
}// ❌ CALLBACK HELL
service1.call().subscribe(r1 -> {
service2.call().subscribe(r2 -> {
service3.call().subscribe(r3 -> {
// Susah di-maintain!
});
});
});
// ✅ REACTIVE CHAIN
return service1.call()
.flatMap(r1 -> service2.call())
.flatMap(r2 -> service3.call());// ❌ BLOCKING - Thread waste!
String result = monoService.getData().block();
// ✅ NON-BLOCKING
return monoService.getData()
.map(this::processData);| Operator | Use Case | Example |
|---|---|---|
.map() |
Transform value | Mono<String> → Mono<Integer> |
.flatMap() |
Chain Mono/Flux | Mono<User> → get profile → Mono<Profile> |
.filter() |
Filter elements | Keep only active users |
.defaultIfEmpty() |
Provide default | Return false if empty |
.switchIfEmpty() |
Fallback Mono | Cache miss → load from DB |
.onErrorReturn() |
Default on error | Error → return default value |
.onErrorResume() |
Fallback on error | Primary fail → use backup |
.doOnNext() |
Side effect | Logging, metrics |
.doOnError() |
Log error | Track failures |
Mono.zip() |
Parallel calls | Execute multiple calls together |
.elapsed() |
Measure time | Performance monitoring |
- Official Docs: https://projectreactor.io/docs/core/release/reference/
- Reactor Core Javadoc: https://projectreactor.io/docs/core/release/api/
- Baeldung Tutorial: https://www.baeldung.com/reactor-core
- Spring WebFlux: https://docs.spring.io/spring-framework/reference/web/webflux.html
- ✅ Pahami
MonovsFlux - ✅ Latihan
.map()vs.flatMap() - ✅ Coba error handling dengan
.onErrorReturn()dan.onErrorResume() - ✅ Eksperimen dengan
Mono.zip()untuk parallel calls - ✅ Jangan pernah pakai
.block()di production!
Happy Reactive Programming! 🎉