Skip to content

Instantly share code, notes, and snippets.

@akimabs
Created November 1, 2025 16:56
Show Gist options
  • Select an option

  • Save akimabs/2c578f761dfdfc22f5b8b6fa21c149ff to your computer and use it in GitHub Desktop.

Select an option

Save akimabs/2c578f761dfdfc22f5b8b6fa21c149ff to your computer and use it in GitHub Desktop.
reactive

🚀 Reactive Programming Guide (Project Reactor)

Panduan belajar Reactive Programming dengan contoh-contoh nyata dari codebase CIAM Service.


📚 Table of Contents

  1. Konsep Dasar
  2. Mono vs Flux
  3. Operators yang Sering Dipakai
  4. Error Handling
  5. Combining Multiple Calls
  6. Performance Monitoring
  7. Best Practices
  8. Contoh dari Codebase

🎯 Konsep Dasar

Reactive Programming itu Apa?

Blocking (Traditional):

// Thread MENUNGGU sampai selesai
String result = database.query("SELECT ..."); // ❌ Thread BLOCKED
processResult(result);

Non-Blocking (Reactive):

// Thread TIDAK menunggu, lanjut kerja yang lain
Mono<String> result = database.query("SELECT ..."); // ✅ Thread BEBAS
result.subscribe(data -> processResult(data));

Kenapa Pakai Reactive?

High Throughput - Bisa handle banyak request dengan thread sedikit ✅ Non-Blocking I/O - Thread tidak idle saat tunggu database/network ✅ Backpressure - Auto handle slow consumers ✅ Composable - Chain operations dengan mudah


📦 Mono vs Flux

Mono<T> - 0 atau 1 element

Dipakai untuk operasi yang return single value atau empty:

// Return 1 device
Mono<Device> device = deviceService.findById("abc123");

// Return 1 string
Mono<String> deviceId = deviceIdUtil.decryptDeviceId(encrypted);

// Return 1 boolean
Mono<Boolean> isEnabled = cachePort.getEnableBurekolParam();

Kapan pakai Mono?

  • Get by ID
  • Count
  • Exists check
  • Any operation yang return single result

Flux<T> - 0 sampai N elements

Dipakai untuk operasi yang return multiple values:

// Return list of devices
Flux<Device> devices = deviceService.findAll();

// Return list of users
Flux<User> users = userRepository.findByStatus("ACTIVE");

// Stream of events
Flux<String> keys = redisTemplate.keys("user:*");

Kapan pakai Flux?

  • Find all / list
  • Stream processing
  • Multiple results

🔧 Operators yang Sering Dipakai

1. Transformasi

.map() - Transform value

// Contoh dari MetadataExtractor.java
Mono<String> deviceIdMono = deviceIdUtil.decryptDeviceId(encryptedDeviceId);

// Transform String → Metadata
Mono<Metadata> metadata = deviceIdMono.map(decryptedDeviceId ->
    Metadata.builder()
        .deviceId(decryptedDeviceId)
        .os("ANDROID")
        .build()
);

Gunakan .map() untuk:

  • Transform tipe data (String → Integer)
  • Extract field dari object
  • Buat object baru dari existing data

.flatMap() - Transform value ke Mono/Flux lain

// Contoh dari DeviceController.java
return metadataExtractor.extract(exchange)
    .flatMap(metadata -> {
        String deviceId = metadata.getDeviceId();

        // Panggil service lain (return Mono)
        return deviceUseCase.checkVersionBinding(deviceId);
    });

Perbedaan .map() vs .flatMap():

// ❌ SALAH - .map() return Mono<Mono<Device>>
.map(metadata -> deviceUseCase.checkVersionBinding(metadata.getDeviceId()))

// ✅ BENAR - .flatMap() return Mono<Device>
.flatMap(metadata -> deviceUseCase.checkVersionBinding(metadata.getDeviceId()))

Gunakan .flatMap() untuk:

  • Chain reactive calls
  • Call service yang return Mono/Flux
  • Nested async operations

2. Filtering

.filter() - Filter elements

// Contoh dari DeviceService.java
return userManagementClientPort.loadGetDeviceBindingByDeviceId(deviceId, true)
    .filter(Device::hasCheckVersionBindingData)  // Hanya lewat kalau true
    .switchIfEmpty(Mono.error(new IllegalStateException("Data not found")));

Gunakan .filter() untuk:

  • Validasi kondisi
  • Skip elements yang tidak memenuhi syarat

.filterWhen() - Filter dengan async check

// Filter dengan async validation
return deviceMono
    .filterWhen(device ->
        permissionService.hasAccess(device.getUserId())  // Return Mono<Boolean>
    );

3. Default Values

.defaultIfEmpty() - Kasih default kalau empty

// Contoh dari DeviceService.java
return cachePort.loadGetEnableBurekolParam()
    .defaultIfEmpty(Boolean.FALSE);  // Kalau Redis kosong, return false

.switchIfEmpty() - Switch ke Mono lain kalau empty

// Coba dari cache dulu, kalau kosong ambil dari DB
return cachePort.getValue("key")
    .switchIfEmpty(
        databasePort.getValue("key")  // Fallback ke DB
            .doOnNext(value -> cachePort.setValue("key", value))  // Save ke cache
    );

4. Error Handling

.onErrorReturn() - Return default value on error

// Contoh dari DeviceRedisAdapter.java
return getHashValue(SYS_PARAM_KEY, ENABLE_BUREKOL_FIELD)
    .map(value -> "true".equalsIgnoreCase(value.trim()))
    .onErrorReturn(Boolean.FALSE);  // Kalau error, return false

.onErrorResume() - Switch ke Mono lain on error

// Try primary service, fallback to backup
return primaryService.getData()
    .onErrorResume(error -> {
        log.warn("Primary failed, using backup");
        return backupService.getData();
    });

.doOnError() - Log error tapi tetap propagate

// Contoh dari RedisCommonAdapter.java
return reactiveRedisTemplate.opsForHash()
    .get(hashKey, field)
    .doOnError(error ->
        log.error("[{}] Error getting hash", databaseName, error)
    );

5. Side Effects (Logging, Debugging)

.doOnNext() - Execute saat ada value

return deviceService.save(device)
    .doOnNext(saved ->
        log.info("Device saved: {}", saved.getId())
    );

.doOnSuccess() - Execute saat success (termasuk empty)

return deviceService.findById(id)
    .doOnSuccess(device -> {
        if (device != null) {
            log.info("Found: {}", device.getId());
        } else {
            log.info("Not found");
        }
    });

.doOnError() - Execute saat error

return riskyOperation()
    .doOnError(error ->
        log.error("Operation failed", error)
    );

.doOnSubscribe() - Execute saat mulai subscribe

return longRunningTask()
    .doOnSubscribe(s ->
        log.info("Starting long task...")
    )
    .doOnNext(result ->
        log.info("Task completed!")
    );

6. Combining Multiple Calls

Mono.zip() - Combine multiple Mono (parallel execution)

// Contoh dari DeviceController.java
return Mono.zip(
    Mono.just(device),
    deviceUseCase.getEnableBurekolParam(),     // Call 1 - parallel
    deviceUseCase.getEnableNtbParam(),         // Call 2 - parallel
    deviceUseCase.isBlockedByDebitCardPin(cif), // Call 3 - parallel
    deviceUseCase.checkPlatformVersion(version)  // Call 4 - parallel
).map(tuple -> {
    Device dev = tuple.getT1();
    Boolean burekol = tuple.getT2();
    Boolean ntb = tuple.getT3();
    Boolean blocked = tuple.getT4();
    String platform = tuple.getT5();

    return DeviceMapper.toResponse(dev, burekol, ntb, blocked, platform);
});

Benefits Mono.zip():

  • ✅ Eksekusi PARALLEL (tidak sequential)
  • ✅ Tunggu semua selesai baru lanjut
  • ✅ Kalau ada 1 error, semua fail

Sequential vs Parallel:

// ❌ SEQUENTIAL - Total: 300ms (100+100+100)
return call1()  // 100ms
    .flatMap(r1 -> call2())  // 100ms
    .flatMap(r2 -> call3());  // 100ms

// ✅ PARALLEL - Total: 100ms (semua jalan bareng)
return Mono.zip(
    call1(),  // 100ms
    call2(),  // 100ms  } Jalan bareng!
    call3()   // 100ms
);

Flux.merge() - Combine Flux streams

Flux<String> source1 = Flux.just("A", "B");
Flux<String> source2 = Flux.just("C", "D");

Flux<String> merged = Flux.merge(source1, source2);
// Output: A, B, C, D (atau C, D, A, B - tidak guaranteed order)

Flux.concat() - Combine Flux sequentially

Flux<String> first = Flux.just("1", "2");
Flux<String> second = Flux.just("3", "4");

Flux<String> concatenated = Flux.concat(first, second);
// Output: 1, 2, 3, 4 (guaranteed order)

⏱️ Performance Monitoring

.elapsed() - Measure execution time

// Contoh dari ControllerLoggingFilter.java
return chain.filter(exchange)
    .elapsed()  // Returns Tuple2<Long, Void>
    .doOnNext(tuple -> {
        long durationMs = tuple.getT1();  // Duration in milliseconds
        log.info("Request took: {}ms", durationMs);
    })
    .then();  // Convert back to Mono<Void>

Common pattern:

return someOperation()
    .elapsed()
    .doOnNext(tuple -> {
        long duration = tuple.getT1();
        String result = tuple.getT2();
        log.info("Operation took {}ms, result: {}", duration, result);
    })
    .map(tuple -> tuple.getT2());  // Extract original value

🎨 Contoh dari Codebase

Contoh 1: Simple Get with Default

// File: DeviceRedisAdapter.java
public Mono<Boolean> loadGetEnableBurekolParam() {
    return getHashValue(SYS_PARAM_KEY, ENABLE_BUREKOL_FIELD)
        .map(value -> "true".equalsIgnoreCase(value.trim()))
        .onErrorReturn(Boolean.FALSE);
}

Penjelasan:

  1. Get value dari Redis
  2. Transform String → Boolean dengan .map()
  3. Kalau error (Redis down), return false

Contoh 2: Chain Multiple Calls

// File: DeviceController.java
return metadataExtractor.extract(exchange)
    .flatMap(metadata -> {
        String deviceId = metadata.getDeviceId();

        // Call service (return Mono<Device>)
        return deviceUseCase.checkVersionBinding(deviceId)
            .flatMap(device -> {
                String cif = device.getCif();

                // Parallel calls
                return Mono.zip(
                    Mono.just(device),
                    deviceUseCase.getEnableBurekolParam(),
                    deviceUseCase.isBlockedByDebitCardPin(cif)
                );
            })
            .map(tuple -> DeviceMapper.toResponse(
                metadata,
                tuple.getT1(),
                tuple.getT2(),
                tuple.getT3()
            ));
    });

Flow:

  1. Extract metadata (async)
  2. Check version binding (async)
  3. Get 3 values in parallel (async)
  4. Map to response

Contoh 3: Filter with Fallback

// File: DeviceService.java
public Mono<Device> checkVersionBinding(String deviceId) {
    return userManagementClientPort.loadGetDeviceBindingByDeviceId(deviceId, true)
        .filter(Device::hasCheckVersionBindingData)
        .switchIfEmpty(
            Mono.error(new IllegalStateException("Device binding data not found"))
        );
}

Penjelasan:

  1. Load dari user management service
  2. Filter: hanya lewat kalau ada data
  3. Kalau empty → throw error

Contoh 4: Error Handling with Logging

// File: RedisCommonAdapter.java
public Mono<Map<String, String>> getAllHashEntries(String hashKey) {
    return reactiveRedisTemplate.opsForHash()
        .entries(hashKey)
        .collectMap(
            entry -> entry.getKey().toString().trim(),
            entry -> entry.getValue().toString().trim()
        )
        .doOnError(error ->
            log.error("[{}] Error getting hash entries - key: {}",
                databaseName, hashKey, error)
        )
        .onErrorReturn(Map.of());  // Return empty map on error
}

Penjelasan:

  1. Get all hash entries dari Redis
  2. Collect ke Map
  3. Kalau error: log dulu, terus return empty map
  4. Application tetap jalan (graceful degradation)

Contoh 5: Conditional Mono Creation

// File: MetadataExtractor.java
Mono<String> deviceIdMono = StringUtils.hasText(encryptedDeviceId)
    ? deviceIdUtil.decryptDeviceId(encryptedDeviceId)
        .onErrorReturn("")
    : Mono.just("");

Penjelasan:

  • Kalau ada encrypted deviceId → decrypt (async)
  • Kalau decrypt error → return empty string
  • Kalau tidak ada → langsung return empty string

✅ Best Practices

1. Jangan Block!

// ❌ JANGAN LAKUKAN INI!
Mono<String> mono = someService.getData();
String result = mono.block();  // BLOCKING! Thread tunggu!

// ✅ LAKUKAN INI
return someService.getData()
    .map(result -> processResult(result));

2. Gunakan .flatMap() untuk Reactive Chain

// ❌ SALAH
.map(user -> userService.getProfile(user.getId()))  // Return Mono<Mono<Profile>>

// ✅ BENAR
.flatMap(user -> userService.getProfile(user.getId()))  // Return Mono<Profile>

3. Handle Errors Gracefully

// ✅ GOOD - Ada fallback
return primaryService.getData()
    .onErrorResume(error -> {
        log.warn("Primary failed, using cache");
        return cacheService.getData();
    })
    .onErrorReturn(defaultValue);  // Last resort

4. Parallel Execution dengan Mono.zip()

// ❌ SEQUENTIAL (lambat)
return service1.call()
    .flatMap(r1 -> service2.call())
    .flatMap(r2 -> service3.call());

// ✅ PARALLEL (cepat)
return Mono.zip(
    service1.call(),
    service2.call(),
    service3.call()
).map(tuple -> combine(tuple.getT1(), tuple.getT2(), tuple.getT3()));

5. Logging untuk Debugging

return someOperation()
    .doOnSubscribe(s -> log.debug("Starting operation..."))
    .doOnNext(result -> log.debug("Got result: {}", result))
    .doOnError(error -> log.error("Failed", error))
    .doOnSuccess(result -> log.debug("Completed!"));

6. Empty Check

// ✅ Provide defaults
return repository.findById(id)
    .switchIfEmpty(Mono.error(new NotFoundException()))
    // atau
    .defaultIfEmpty(defaultValue);

🔍 Common Pitfalls

1. Subscribe di Service Layer

// ❌ JANGAN SUBSCRIBE DI SERVICE!
public void saveUser(User user) {
    userRepository.save(user)
        .subscribe();  // ❌ Lost error handling!
}

// ✅ RETURN MONO
public Mono<User> saveUser(User user) {
    return userRepository.save(user);  // ✅ Let controller handle
}

2. Nested Subscribe

// ❌ CALLBACK HELL
service1.call().subscribe(r1 -> {
    service2.call().subscribe(r2 -> {
        service3.call().subscribe(r3 -> {
            // Susah di-maintain!
        });
    });
});

// ✅ REACTIVE CHAIN
return service1.call()
    .flatMap(r1 -> service2.call())
    .flatMap(r2 -> service3.call());

3. Lupakan .block() di Production

// ❌ BLOCKING - Thread waste!
String result = monoService.getData().block();

// ✅ NON-BLOCKING
return monoService.getData()
    .map(this::processData);

📚 Cheat Sheet

Operator Use Case Example
.map() Transform value Mono<String>Mono<Integer>
.flatMap() Chain Mono/Flux Mono<User> → get profile → Mono<Profile>
.filter() Filter elements Keep only active users
.defaultIfEmpty() Provide default Return false if empty
.switchIfEmpty() Fallback Mono Cache miss → load from DB
.onErrorReturn() Default on error Error → return default value
.onErrorResume() Fallback on error Primary fail → use backup
.doOnNext() Side effect Logging, metrics
.doOnError() Log error Track failures
Mono.zip() Parallel calls Execute multiple calls together
.elapsed() Measure time Performance monitoring

🎓 Learning Resources

  1. Official Docs: https://projectreactor.io/docs/core/release/reference/
  2. Reactor Core Javadoc: https://projectreactor.io/docs/core/release/api/
  3. Baeldung Tutorial: https://www.baeldung.com/reactor-core
  4. Spring WebFlux: https://docs.spring.io/spring-framework/reference/web/webflux.html

🚀 Next Steps

  1. ✅ Pahami Mono vs Flux
  2. ✅ Latihan .map() vs .flatMap()
  3. ✅ Coba error handling dengan .onErrorReturn() dan .onErrorResume()
  4. ✅ Eksperimen dengan Mono.zip() untuk parallel calls
  5. ✅ Jangan pernah pakai .block() di production!

Happy Reactive Programming! 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment