Created
October 14, 2025 00:42
-
-
Save gregfichtenholtz-illumio/81fb537e24f7187e9de37686bb8eca7d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.foo.bar; | |
| import org.apache.kafka.common.serialization.Serdes; | |
| import org.apache.kafka.streams.StreamsBuilder; | |
| import org.apache.kafka.streams.kstream.*; | |
| import org.apache.kafka.streams.StreamsConfig; | |
| import org.apache.kafka.streams.TestInputTopic; | |
| import org.apache.kafka.streams.TestOutputTopic; | |
| import org.apache.kafka.streams.TopologyTestDriver; | |
| import org.junit.jupiter.api.AfterEach; | |
| import org.junit.jupiter.api.BeforeEach; | |
| import org.junit.jupiter.api.Test; | |
| import java.time.Duration; | |
| import java.time.Instant; | |
| import java.util.Properties; | |
| import static org.junit.jupiter.api.Assertions.*; | |
| public class ChainedEmitStrategyTopologyTest { | |
| public static final String INPUT_TOPIC = "input-topic"; | |
| public static final String FINAL_OUTPUT_TOPIC = "final-output-topic"; | |
| private TopologyTestDriver testDriver; | |
| private TestInputTopic<String, String> inputTopic; | |
| private TestOutputTopic<String, Long> finalOutputTopic; | |
| public static StreamsBuilder buildTopology() { | |
| StreamsBuilder builder = new StreamsBuilder(); | |
| KStream<String, String> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())); | |
| // First aggregation: 1-minute tumbling window with a 30-second grace period | |
| KTable<Windowed<String>, Long> firstWindowedCounts = stream | |
| .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) | |
| .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(30))) | |
| .count(Materialized.as("first-counts-store")) | |
| .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); | |
| // Convert the windowed KTable back to a KStream for the next operation | |
| KStream<String, Long> intermediateStream = firstWindowedCounts | |
| .toStream((windowedKey, count) -> windowedKey.key()); | |
| // Second aggregation: 5-minute tumbling window with a 2-minute grace period | |
| KTable<Windowed<String>, Long> secondWindowedCounts = intermediateStream | |
| .groupByKey(Grouped.with(Serdes.String(), Serdes.Long())) | |
| .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(2))) | |
| .count(Materialized.as("second-counts-store")) | |
| .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); | |
| secondWindowedCounts.toStream((windowedKey, count) -> windowedKey.key()) | |
| .to(FINAL_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); | |
| return builder; | |
| } | |
| @BeforeEach | |
| void setUp() { | |
| Properties config = new Properties(); | |
| config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-chained-grace"); | |
| config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); | |
| config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
| config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
| testDriver = new TopologyTestDriver(buildTopology().build(), config); | |
| inputTopic = testDriver.createInputTopic( | |
| INPUT_TOPIC, | |
| Serdes.String().serializer(), | |
| Serdes.String().serializer() | |
| ); | |
| finalOutputTopic = testDriver.createOutputTopic( | |
| FINAL_OUTPUT_TOPIC, | |
| Serdes.String().deserializer(), | |
| Serdes.Long().deserializer() | |
| ); | |
| } | |
| @AfterEach | |
| void tearDown() { | |
| if (testDriver != null) { | |
| testDriver.close(); | |
| } | |
| } | |
| @Test | |
| void testChainedWindowedAggregationsWithDifferentGracePeriods() { | |
| Instant startTime = Instant.parse("2025-10-13T10:00:00Z"); | |
| Instant firstWindowEndTime = startTime.plus(Duration.ofMinutes(1)); // 10:01:00 | |
| Instant firstWindowCloseTime = firstWindowEndTime.plus(Duration.ofSeconds(30)); // 10:01:30 | |
| Instant secondWindowCloseTime = startTime.plus(Duration.ofMinutes(5)).plus(Duration.ofMinutes(2)); // 10:07:00 | |
| // ---- Step 1: Input records for the first window (10:00 - 10:01) ---- | |
| inputTopic.pipeInput("A", "value1", startTime); | |
| inputTopic.pipeInput("B", "value2", startTime.plusSeconds(15)); | |
| inputTopic.pipeInput("A", "value3", startTime.plusSeconds(30)); | |
| // No output yet, as no window is closed | |
| assertTrue(finalOutputTopic.isEmpty()); | |
| // ---- Step 2: Simulate a late record for the first window ---- | |
| // This record for "B" arrives after the window end time (10:01) but within the grace period (10:01:30). | |
| inputTopic.pipeInput("B", "value4", startTime.plusSeconds(75)); // Timestamp 10:01:15 | |
| // Still no output from the first suppress(), as the grace period is not over | |
| assertTrue(finalOutputTopic.isEmpty()); | |
| // ---- Step 3: Advance time to close the *first* window (10:01 + 30s) ---- | |
| // This input with a timestamp > 10:01:30 will advance stream time past the first window's grace period. | |
| inputTopic.pipeInput("C", "value5", firstWindowCloseTime); | |
| // Still no final output because the second window is still open and buffering | |
| assertTrue(finalOutputTopic.isEmpty()); | |
| // ---- Step 4: Advance time to close the *second* window (10:05 + 2min) ---- | |
| // This input with a timestamp > 10:07:00 will advance stream time past the second window's grace period. | |
| inputTopic.pipeInput("D", "value6", secondWindowCloseTime); | |
| // Now, verify the final output. | |
| // The first window for "A" closed with a count of 2. | |
| // The first window for "B" closed with a count of 2 (original + late record). | |
| // The second window aggregates these two results, so the final count should be 2. | |
| assertEquals(1, finalOutputTopic.getQueueSize(), "Final output should contain one result"); | |
| assertEquals(2L, finalOutputTopic.readValue(), "Final count should be 2, aggregating the results from the first stage"); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment