# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/song/9{
"artist":"N.W.A",
"album":"Straight Outta Compton",
"name":"Gangsta Gangsta"| // Get the window store named "queryStoreName" | |
| ReadOnlyWindowStore<String, Long> windowStore = | |
| streams.store("queryStoreName", QueryableStoreTypes.windowStore()); | |
| // Fetch values for the key "europe" for all of the windows available in this application instance. | |
| // To get *all* available windows we fetch windows from the beginning of time until now. | |
| long timeFrom = 0; // beginning of time = oldest available | |
| long timeTo = System.currentTimeMillis(); // now (in processing-time) | |
| WindowStoreIterator<Long> iterator = windowStore.fetch("europe", timeFrom, timeTo); | |
| while (iterator.hasNext()) { |
| KTable<Windowed<String>, Long> aggregated = inputStream | |
| .groupByKey() | |
| .reduce((aggValue, newValue) -> aggValue + newValue, | |
| TimeWindows.of(TimeUnit.MINUTES.toMillis(2)) | |
| .until(TimeUnit.DAYS.toMillis(1) /* keep for one day */), | |
| "queryStoreName"); |
| PCollection<KV<String, Integer>> scores = input | |
| .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))) | |
| .triggering( | |
| AtWatermark() | |
| .withEarlyFirings(AtPeriod(Duration.standardMinutes(1))) | |
| .withLateFirings(AtCount(1))) | |
| .accumulatingAndRetractingFiredPanes()) | |
| .apply(Sum.integersPerKey()) |
# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/song/9{
"artist":"N.W.A",
"album":"Straight Outta Compton",
"name":"Gangsta Gangsta"# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/charts/genre/punk[
{
"artist":"Wheres The Pope?",
"album":"PSI",# Get all the top-5 charts across all instances
$ http://localhost:7070/kafka-music/charts/top-five[
{
"artist":"Hilltop Hoods",
"album":"The Calling",# List running app instances that currently manage (parts of) state store "top-five-songs"
$ http://localhost:7070/kafka-music/instances/top-five-songs[
{
"host": "localhost",
"port": 7070,# List all running instances of this application
$ http://localhost:7070/kafka-music/instances[
{
"host": "localhost",
"port": 7070,