Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save gregfichtenholtz-illumio/81fb537e24f7187e9de37686bb8eca7d to your computer and use it in GitHub Desktop.

Select an option

Save gregfichtenholtz-illumio/81fb537e24f7187e9de37686bb8eca7d to your computer and use it in GitHub Desktop.
package com.foo.bar;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.*;
public class ChainedEmitStrategyTopologyTest {
public static final String INPUT_TOPIC = "input-topic";
public static final String FINAL_OUTPUT_TOPIC = "final-output-topic";
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, Long> finalOutputTopic;
public static StreamsBuilder buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
// First aggregation: 1-minute tumbling window with a 30-second grace period
KTable<Windowed<String>, Long> firstWindowedCounts = stream
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(30)))
.count(Materialized.as("first-counts-store"))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
// Convert the windowed KTable back to a KStream for the next operation
KStream<String, Long> intermediateStream = firstWindowedCounts
.toStream((windowedKey, count) -> windowedKey.key());
// Second aggregation: 5-minute tumbling window with a 2-minute grace period
KTable<Windowed<String>, Long> secondWindowedCounts = intermediateStream
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(2)))
.count(Materialized.as("second-counts-store"))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
secondWindowedCounts.toStream((windowedKey, count) -> windowedKey.key())
.to(FINAL_OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
return builder;
}
@BeforeEach
void setUp() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-chained-grace");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
testDriver = new TopologyTestDriver(buildTopology().build(), config);
inputTopic = testDriver.createInputTopic(
INPUT_TOPIC,
Serdes.String().serializer(),
Serdes.String().serializer()
);
finalOutputTopic = testDriver.createOutputTopic(
FINAL_OUTPUT_TOPIC,
Serdes.String().deserializer(),
Serdes.Long().deserializer()
);
}
@AfterEach
void tearDown() {
if (testDriver != null) {
testDriver.close();
}
}
@Test
void testChainedWindowedAggregationsWithDifferentGracePeriods() {
Instant startTime = Instant.parse("2025-10-13T10:00:00Z");
Instant firstWindowEndTime = startTime.plus(Duration.ofMinutes(1)); // 10:01:00
Instant firstWindowCloseTime = firstWindowEndTime.plus(Duration.ofSeconds(30)); // 10:01:30
Instant secondWindowCloseTime = startTime.plus(Duration.ofMinutes(5)).plus(Duration.ofMinutes(2)); // 10:07:00
// ---- Step 1: Input records for the first window (10:00 - 10:01) ----
inputTopic.pipeInput("A", "value1", startTime);
inputTopic.pipeInput("B", "value2", startTime.plusSeconds(15));
inputTopic.pipeInput("A", "value3", startTime.plusSeconds(30));
// No output yet, as no window is closed
assertTrue(finalOutputTopic.isEmpty());
// ---- Step 2: Simulate a late record for the first window ----
// This record for "B" arrives after the window end time (10:01) but within the grace period (10:01:30).
inputTopic.pipeInput("B", "value4", startTime.plusSeconds(75)); // Timestamp 10:01:15
// Still no output from the first suppress(), as the grace period is not over
assertTrue(finalOutputTopic.isEmpty());
// ---- Step 3: Advance time to close the *first* window (10:01 + 30s) ----
// This input with a timestamp > 10:01:30 will advance stream time past the first window's grace period.
inputTopic.pipeInput("C", "value5", firstWindowCloseTime);
// Still no final output because the second window is still open and buffering
assertTrue(finalOutputTopic.isEmpty());
// ---- Step 4: Advance time to close the *second* window (10:05 + 2min) ----
// This input with a timestamp > 10:07:00 will advance stream time past the second window's grace period.
inputTopic.pipeInput("D", "value6", secondWindowCloseTime);
// Now, verify the final output.
// The first window for "A" closed with a count of 2.
// The first window for "B" closed with a count of 2 (original + late record).
// The second window aggregates these two results, so the final count should be 2.
assertEquals(1, finalOutputTopic.getQueueSize(), "Final output should contain one result");
assertEquals(2L, finalOutputTopic.readValue(), "Final count should be 2, aggregating the results from the first stage");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment