PolarSPARC |
Exploring Kafka Streams :: Part 5
Bhaskar S | 12/05/2021 |
Overview
In Part 4 of this series, we explored how to serialize and deserialize custom data values in Kafka Streams.
In this part of the series, we will explore grouping of data events using the concept of Windowing in Kafka Streams.
Kafka Streams Concepts
Windowing
The concept of Windowing allows one to group data events (records) together for a given key (same key) within a given window boundary for stateful aggregation operations such as, count(), aggregate(), reduce() etc. One point to keep in mind - the windowing techniques are based on the data events timestamp.
For certain analytics use-cases, it makes sense to look at events within a period of time window to make real-time decisions such as, fraud detection, inventory management, etc.
Kafka Streams supports the following windowing types:
Tumbling Time Window :: they model a series of fixed length, discrete time windows that do NOT overlap. In other words, the next time window starts at the end of the current time window interval (referred to as the window size). An example would be - the number of costumes sold in a time period of 1 hour the day before Halloween
The following illustration depicts a tumbling window (with the colored geometic icons as events):
Hopping Time Window :: they model a series of fixed length time windows that WILL overlap at the hop interval (referred to as the advance or the hop size). In other words, the next window starts at the end of the current hop interval (hop size which is less than the window size). This implies there will be multiple overlapping time windows in existence before the window size ends. An example would be - the number of headphones sold in a time period of 30 mins over the last 1 hour on Black Friday
The following illustration depicts a hopping window (with the colored geometic icons as events):
Sliding Time Window :: they model a series of fixed length time windows that slide continuously over time and capture change of events during that time window interval (time duration) plus a grace period. The grace period allows one to allocate an extra time to include related events (in the aggretation) that come out of band. In other words, the next window is evaluated only when new event(s) are generated in the event stream, with the latest event timestamp marking the end time and the difference to the start time being the fixed length time duration. All events that fall in the interval between the start and end time are used in the aggregation. An example would be - alert when there are 5 or more searches for a specific auto model at a dealership site within a 1 hour time window.
The following illustration depicts a sliding window (with the colored geometic icons as events):
Session Window :: they model grouping of events into sessions that belong to a set of keys. Sessions represent a period of activity followed by a gap of inactivity. A session window begins when the first event occurs. If another event occurs within the specified inactivity period from the last event, then the window will extend to include the new event. Otherwise, if no events occur within the inactivity period, then the window is closed at the end of the inactivity period. An example would be - the number of clicks by a customer on the different deals offered by an online retailer with a 5 miniute inactivity period.
Hands-on with Kafka Streams
Fourth Application
In the Fourth module, we will simulate a data event stream, where each event contains a fictitious product key (as a string) and how many quantities were sold (as an integer) as value. We will demonstrate a STATEFUL Kafka Streams application that will consume the stream of data events from the specified Kafka topic and aggregate the total quantity of product sold using the three windowing techniques - tumbling, hopping, and sliding.
To setup the Java directory structure for the Fourth application, execute the following commands:
$ cd $HOME/java/KafkaStreams
$ mkdir -p $HOME/java/KafkaStreams/Fourth
$ mkdir -p Fourth/src/main/java Fourth/src/main/resources Fourth/target
$ mkdir -p Fourth/src/main/java/com/polarsparc/kstreams
$ cd $HOME/java/KafkaStreams/Fourth
The following is the listing for the Maven project file pom.xml that will be used:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <artifactId>KafkaStreams</artifactId> <groupId>com.polarsparc.kstreams</groupId> <version>1.0</version> </parent> <artifactId>Fourth</artifactId> <version>1.0</version> <dependencies> <dependency> <artifactId>Common</artifactId> <groupId>com.polarsparc.kstreams</groupId> <version>1.0</version> </dependency> </dependencies> </project>
We need to modify the <modules> section in the parent pom.xml to include the Fourth module as shown below:
<modules> <module>Common</module> <module>First</module> <module>Second</module> <module>Third</module> <module>Fourth</module> </modules>
We will need a Kafka producer that will generate the stream of data events. The following is the Java Kafka publisher:
/* * Name: Stream Data Event Generator * Author: Bhaskar S * Date: 12/04/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.kstreams; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.Random; public class StreamEventGenerator { private final static int MAX_EVENTS = 10; private final static int MAX_GAP = 3000; // 3 seconds = 3000 ms private static final Logger log = LoggerFactory.getLogger(StreamEventGenerator.class.getName()); private final static Random random = new Random(1001); private final static List<String> keysList = Arrays.asList("A", "M", "S", "B", "N", "T"); private final static List<Integer> valuesList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); private static KafkaProducer<String, Integer> createEventProducer() { Properties config = new Properties(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:20001"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); return new KafkaProducer<>(config); } private static void generateDataEvent(boolean flag, String topic, Producer<String, Integer> producer) { log.info(String.format(" ---> Topic: %s", topic)); int cnt = random.nextInt(MAX_EVENTS); int gap = random.nextInt(MAX_GAP); log.info(String.format(" ---> Events Count: %d", cnt)); for (int i = 1; i <= cnt; i++) { int ki = random.nextInt(keysList.size()); int vi = random.nextInt(valuesList.size()); String key = keysList.get(ki); Integer value = valuesList.get(vi); log.info(String.format(" ---> [%d] Key: %s, Value: %d", i, key, value)); if (!flag) { try { producer.send(new ProducerRecord<>(topic, key, value)).get(); } catch (Exception ex) { log.error(ex.getMessage()); } } } try { Thread.sleep(gap); } catch (Exception ignore) { } log.info(String.format(" ---> Sleep Gap: %d", gap)); } public static void main(String[] args) { if (args.length < 1) { System.out.printf("Usage: java %s <topic-name> [--dry-run]\n", StreamEventGenerator.class.getName()); System.exit(1); } boolean dryRun = args.length == 2 && args[1].equalsIgnoreCase("--dry-run"); Producer<String, Integer> producer = null; if (!dryRun) { producer = createEventProducer(); } for (int i = 1; i <= 5; i++) { log.info(String.format("---------> Iteration: %d", i)); generateDataEvent(dryRun, args[0], producer); } if (!dryRun) { producer.close(); } } }
The way the Kafka producer will works is - in each iteration, it will publish a set of random events and then sleep for a random time (that is less than 3 seconds) before continuing with the next iteration.
Also, we have fixed the random number generator seed to generate a predictable set of events.
In addition, there is a --dry-run option to display the data events.
To compile and test the code from Listing.1 in the --dry-run mode, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn clean compile
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="test --dry-run"
The following should be the typical output:
[com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 1 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: test [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: A, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: B, Value: 7 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: A, Value: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 2223 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 2 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: test [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: T, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: B, Value: 9 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: M, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: M, Value: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 343 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: test [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 9 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: M, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: A, Value: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: T, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: M, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [5] Key: A, Value: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [6] Key: S, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [7] Key: T, Value: 2 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [8] Key: T, Value: 2 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [9] Key: S, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 2110 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: test [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: S, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: T, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: T, Value: 1 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: A, Value: 6 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 402 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: test [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: A, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: T, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: B, Value: 7 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: T, Value: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [5] Key: B, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 2585
The following is the Java based STATEFUL Kafka Streams application that consumes and processes events from the specific Kafka topic based on the input argument (that controls the type of windowing technique):
/* * Name: Sum values using various Windowing options * Author: Bhaskar S * Date: 12/04/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.kstreams; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; public class WindowedSum { private static void usage() { System.out.printf("Usage: java %s <TM | HP | SL>\n", WindowedSum.class.getName()); System.exit(1); } public static void main(String[] args) { if (args.length != 1) { usage(); } String topicName = switch (args[0]) { case "TM" -> "tumbling-events"; case "HP" -> "hopping-events"; case "SL" -> "sliding-events"; default -> null; }; if (topicName == null) { usage(); } Logger log = LoggerFactory.getLogger(WindowedSum.class.getName()); log.info(String.format("---> Event type: %s", topicName)); StreamsConfig config = new StreamsConfig(KafkaConsumerConfig.kafkaConfigurationTwo(topicName, 1)); Serde<String> stringSerde = Serdes.String(); Serde<Integer> integerSerde = Serdes.Integer(); // Window duration Duration windowSz = Duration.ofSeconds(5); StreamsBuilder builder = new StreamsBuilder(); KStream<String, Integer> stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde)); KGroupedStream<String, Integer> groupedStream = stream .peek((key, value) -> log.info(String.format("---> [%d] >> Key: %s, Value: %d", System.currentTimeMillis(), key, value))) .groupByKey(); TimeWindowedKStream<String, Integer> windowedStream = switch (args[0]) { default -> { // TM is the default TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowSz); tumblingWindow.advanceBy(windowSz); // *IMPORTANT* for tumbling window size = advance yield groupedStream.windowedBy(tumblingWindow); } case "HP" -> { Duration advanceSz = Duration.ofSeconds(2); TimeWindows hoppingWindow = TimeWindows.ofSizeWithNoGrace(windowSz); hoppingWindow.advanceBy(advanceSz); yield groupedStream.windowedBy(hoppingWindow); } case "SL" -> { Duration graceSz = Duration.ofMillis(500); // Grace period SlidingWindows slidingWindow = SlidingWindows.ofTimeDifferenceAndGrace(windowSz, graceSz); yield groupedStream.windowedBy(slidingWindow); } }; windowedStream.reduce(Integer::sum) .toStream() .peek((winKey, sum) -> log.info(String.format("---> [%d] >> Window: %s, Key: %s, Sum: %d", System.currentTimeMillis(), winKey.window().toString(), winKey.key(), sum))); Topology topology = builder.build(); log.info(String.format("---> %s", topology.describe().toString())); KafkaStreams streams = new KafkaStreams(topology, config); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } }
The code from Listing.2 above needs some explanation:
The interface class org.apache.kafka.streams.kstream.TimeWindowedKStream<K, V> is an abstraction of a windowed data event stream of the key-value pairs. It is basically an intermediate representation of a KStream that can used for performing windowed aggregation operations, such as, count(), aggregate(), reduce(), etc.
The class org.apache.kafka.streams.kstream.TimeWindows encapsulates a timed window that is of a fixed duration (size) and shifts forward at fixed interval (advance). When advance < size, we get a hopping window, and when advance = size, we get a tumbling window.
The static method TimeWindows.ofSizeWithNoGrace(SIZE) returns a fixed length window of SIZE.
The method advanceBy(SIZE) on an instance of TimeWindows instance, sets the advance (or hop) SIZE.
The class org.apache.kafka.streams.kstream.SlidingWindows encapsulates a sliding window that is of a fixed duration (size) and is adjusted based on the incoming data event's timestamp. It will only include data events that lie in the time duration plus the grace period.
The static method SlidingWindows.ofTimeDifferenceAndGrace(SIZE, GRACE) returns a fixed length sliding window of SIZE with a grace period of GRACE.
The DSL method reduce(AGGREGATOR) allows one to aggregate each incoming data events that are grouped together by a key using the specified AGGREGATOR function.
Now, we need to create the Kafka topics tumbling-events, hopping-events, and sliding-events corresponding to the three windowing techniques we want to demonstrate.
To create the Kafka topic tumbling-events with a single partition using docker, execute the following command:
docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic tumbling-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001
The following should be the typical output:
Created topic tumbling-events.
To create the Kafka topic hopping-events with a single partition using docker, execute the following command:
docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic hopping-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001
The following should be the typical output:
Created topic hopping-events.
Finally, to create the Kafka topic sliding-events with a single partition using docker, execute the following command:
docker run --rm --net=host confluentinc/cp-kafka:7.0.0 kafka-topics --create --topic sliding-events --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server localhost:20001
The following should be the typical output:
Created topic sliding-events.
Now is time to compile the code from Listing.1 and Listing.2. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn clean compile
Now, it is time to test the code from Listing.2 using the tumbling window technique. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedSum -Dexec.args="TM"
The following should be the typical output:
[com.polarsparc.kstreams.WindowedSum.main()] INFO com.polarsparc.kstreams.WindowedSum - ---> Event type: tumbling-events ... SNIP ... [com.polarsparc.kstreams.WindowedSum.main()] INFO com.polarsparc.kstreams.WindowedSum - ---> Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [tumbling-events]) --> KSTREAM-PEEK-0000000001 Processor: KSTREAM-PEEK-0000000001 (stores: []) --> KSTREAM-REDUCE-0000000003 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-REDUCE-0000000003 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000002]) --> KTABLE-TOSTREAM-0000000004 <-- KSTREAM-PEEK-0000000001 Processor: KTABLE-TOSTREAM-0000000004 (stores: []) --> KSTREAM-PEEK-0000000005 <-- KSTREAM-REDUCE-0000000003 Processor: KSTREAM-PEEK-0000000005 (stores: []) --> none <-- KTABLE-TOSTREAM-0000000004 ... SNIP ... [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e] State transition from REBALANCING to RUNNING [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1-consumer, groupId=tumbling-events] Requesting the log end offset for tumbling-events-0 in order to compute lag
Now, it is time to run the producer code from Listing.1 to publish to the Kafka topic tumbling-events. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="tumbling-events"
The following should be the typical output:
... SNIP ... [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1638723341187 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 1 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: tumbling-events [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: A, Value: 3 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: yS_GW5KbTBqUJVfzsjJn0A [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: B, Value: 7 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: A, Value: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 2223 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 2 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: tumbling-events [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: T, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: B, Value: 9 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: M, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: M, Value: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 343 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: tumbling-events [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 9 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: M, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: A, Value: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: T, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: M, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [5] Key: A, Value: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [6] Key: S, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [7] Key: T, Value: 2 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [8] Key: T, Value: 2 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [9] Key: S, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 2110 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: tumbling-events [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: S, Value: 3 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: T, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: T, Value: 1 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: A, Value: 6 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 402 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---------> Iteration: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Topic: tumbling-events [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Events Count: 5 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [1] Key: A, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [2] Key: T, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [3] Key: B, Value: 7 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [4] Key: T, Value: 4 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> [5] Key: B, Value: 8 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO com.polarsparc.kstreams.StreamEventGenerator - ---> Sleep Gap: 2585 [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed [com.polarsparc.kstreams.StreamEventGenerator.main()] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
The following would be the additional output in the terminal running the application from Listing.2 above:
[tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341535] >> Key: A, Value: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341541] >> Key: B, Value: 7 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341542] >> Key: A, Value: 5 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000002.1638723300000 in regular mode [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341996] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: B, Sum: 7 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723341997] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: A, Sum: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343762] >> Key: T, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343766] >> Key: B, Value: 9 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343771] >> Key: M, Value: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723343775] >> Key: M, Value: 4 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344077] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: T, Sum: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344078] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: B, Sum: 16 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344079] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: M, Sum: 7 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344127] >> Key: M, Value: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344131] >> Key: A, Value: 5 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344133] >> Key: T, Value: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344136] >> Key: M, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344138] >> Key: A, Value: 4 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344141] >> Key: S, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344145] >> Key: T, Value: 2 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344147] >> Key: T, Value: 2 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723344151] >> Key: S, Value: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345155] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: M, Sum: 18 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345156] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: A, Sum: 17 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345157] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: T, Sum: 15 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723345158] >> Window: Window{startMs=1638723340000, endMs=1638723345000}, Key: S, Sum: 11 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346268] >> Key: S, Value: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346271] >> Key: T, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346275] >> Key: T, Value: 1 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346277] >> Key: A, Value: 6 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346686] >> Key: A, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346690] >> Key: T, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346694] >> Key: B, Value: 7 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346697] >> Key: T, Value: 4 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723346699] >> Key: B, Value: 8 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347202] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: S, Sum: 3 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347203] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: A, Sum: 14 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347204] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: T, Sum: 21 [tumbling-events-f6732d9d-25cd-4ed7-8077-7ef4e42b433e-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723347205] >> Window: Window{startMs=1638723345000, endMs=1638723350000}, Key: B, Sum: 15
Analyzing the lines from the Output.7 above we can see the tumbling window aggregation working as expected.
Moving on, it is time to test the code from Listing.2 using the hopping window technique. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedSum -Dexec.args="HP"
We will not show the output as it will be similar in nature to Output.5 above (except will indicate hopping-events).
Next, it is time to run the producer code from Listing.1 to publish to the Kafka topic hopping-events. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="hopping-events"
We will not show the output as it will be similar to the Output.6 above.
The following would be the additional output in the terminal running the application from Listing.2 above (in the hopping window mode):
[hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624318] >> Key: A, Value: 3 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624323] >> Key: B, Value: 7 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624324] >> Key: A, Value: 5 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000002.1638723600000 in regular mode [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624978] >> Window: Window{startMs=1638723620000, endMs=1638723625000}, Key: B, Sum: 7 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723624980] >> Window: Window{startMs=1638723620000, endMs=1638723625000}, Key: A, Sum: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626545] >> Key: T, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626549] >> Key: B, Value: 9 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626552] >> Key: M, Value: 3 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626555] >> Key: M, Value: 4 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626904] >> Key: M, Value: 3 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626907] >> Key: A, Value: 5 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626911] >> Key: T, Value: 3 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626914] >> Key: M, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626916] >> Key: A, Value: 4 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626919] >> Key: S, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626922] >> Key: T, Value: 2 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626924] >> Key: T, Value: 2 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723626927] >> Key: S, Value: 3 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627030] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: B, Sum: 9 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627031] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: M, Sum: 18 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627032] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: A, Sum: 9 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627033] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: T, Sum: 15 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723627034] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: S, Sum: 11 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629043] >> Key: S, Value: 3 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629046] >> Key: T, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629048] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: S, Sum: 14 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629049] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: T, Sum: 23 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629054] >> Key: T, Value: 1 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629056] >> Key: A, Value: 6 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629460] >> Key: A, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629464] >> Key: T, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629467] >> Key: B, Value: 7 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629470] >> Key: T, Value: 4 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723629473] >> Key: B, Value: 8 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723630075] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: A, Sum: 23 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723630076] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: T, Sum: 36 [hopping-events-76ba7798-dae2-4e89-8eff-5b1d47a9d4b9-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723630077] >> Window: Window{startMs=1638723625000, endMs=1638723630000}, Key: B, Sum: 24
Analyzing the lines from the Output.8 above we can see the hopping window aggregation working as expected.
Finally, it is time to test the code from Listing.2 using the sliding window technique. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.WindowedSum -Dexec.args="SL"
We will not show the output as it will be similar in nature to Output.5 above (except will indicate sliding-events).
Next, it is time to run the producer code from Listing.1 to publish to the Kafka topic sliding-events. To do that, open a terminal window and execute the following commands:
$ $HOME/java/KafkaStreams/Fourth
$ mvn exec:java -Dexec.mainClass=com.polarsparc.kstreams.StreamEventGenerator -Dexec.args="sliding-events"
We will not show the output as it will be similar to the Output.6 above.
The following would be the additional output in the terminal running the application from Listing.2 above (in the sliding window mode):
[sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723890568] >> Key: A, Value: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723890576] >> Key: B, Value: 7 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723890577] >> Key: A, Value: 5 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO org.apache.kafka.streams.state.internals.RocksDBTimestampedStore - Opening store KSTREAM-REDUCE-STATE-STORE-0000000002.1638723840000 in regular mode [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891033] >> Window: Window{startMs=1638723885524, endMs=1638723890524}, Key: A, Sum: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891034] >> Window: Window{startMs=1638723885558, endMs=1638723890558}, Key: B, Sum: 7 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891035] >> Window: Window{startMs=1638723890525, endMs=1638723895525}, Key: A, Sum: 5 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723891035] >> Window: Window{startMs=1638723885562, endMs=1638723890562}, Key: A, Sum: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892797] >> Key: T, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892801] >> Key: B, Value: 9 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892804] >> Key: M, Value: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723892808] >> Key: M, Value: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893111] >> Window: Window{startMs=1638723887792, endMs=1638723892792}, Key: T, Sum: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893112] >> Window: Window{startMs=1638723890559, endMs=1638723895559}, Key: B, Sum: 9 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893113] >> Window: Window{startMs=1638723887796, endMs=1638723892796}, Key: B, Sum: 16 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893114] >> Window: Window{startMs=1638723887800, endMs=1638723892800}, Key: M, Sum: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893116] >> Window: Window{startMs=1638723892801, endMs=1638723897801}, Key: M, Sum: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893116] >> Window: Window{startMs=1638723887804, endMs=1638723892804}, Key: M, Sum: 7 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893159] >> Key: M, Value: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893162] >> Key: A, Value: 5 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893166] >> Key: T, Value: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893169] >> Key: M, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893171] >> Key: A, Value: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893175] >> Key: S, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893178] >> Key: T, Value: 2 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893181] >> Key: T, Value: 2 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723893184] >> Key: S, Value: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894188] >> Window: Window{startMs=1638723888155, endMs=1638723893155}, Key: M, Sum: 10 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894189] >> Window: Window{startMs=1638723888158, endMs=1638723893158}, Key: A, Sum: 13 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894190] >> Window: Window{startMs=1638723888162, endMs=1638723893162}, Key: T, Sum: 11 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894191] >> Window: Window{startMs=1638723892805, endMs=1638723897805}, Key: M, Sum: 11 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894192] >> Window: Window{startMs=1638723892801, endMs=1638723897801}, Key: M, Sum: 15 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894193] >> Window: Window{startMs=1638723893156, endMs=1638723898156}, Key: M, Sum: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894194] >> Window: Window{startMs=1638723888165, endMs=1638723893165}, Key: M, Sum: 18 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894195] >> Window: Window{startMs=1638723890563, endMs=1638723895563}, Key: A, Sum: 9 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894196] >> Window: Window{startMs=1638723890525, endMs=1638723895525}, Key: A, Sum: 14 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894197] >> Window: Window{startMs=1638723893159, endMs=1638723898159}, Key: A, Sum: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894198] >> Window: Window{startMs=1638723888168, endMs=1638723893168}, Key: A, Sum: 17 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894199] >> Window: Window{startMs=1638723888171, endMs=1638723893171}, Key: S, Sum: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894199] >> Window: Window{startMs=1638723888174, endMs=1638723893174}, Key: T, Sum: 13 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894200] >> Window: Window{startMs=1638723893163, endMs=1638723898163}, Key: T, Sum: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894201] >> Window: Window{startMs=1638723892793, endMs=1638723897793}, Key: T, Sum: 7 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894201] >> Window: Window{startMs=1638723893175, endMs=1638723898175}, Key: T, Sum: 2 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894202] >> Window: Window{startMs=1638723888178, endMs=1638723893178}, Key: T, Sum: 15 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894202] >> Window: Window{startMs=1638723893172, endMs=1638723898172}, Key: S, Sum: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723894203] >> Window: Window{startMs=1638723888181, endMs=1638723893181}, Key: S, Sum: 11 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895301] >> Key: S, Value: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895304] >> Key: T, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895308] >> Key: T, Value: 1 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895312] >> Key: A, Value: 6 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895720] >> Key: A, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895723] >> Key: T, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895726] >> Key: B, Value: 7 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895729] >> Key: T, Value: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723895731] >> Key: B, Value: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896234] >> Window: Window{startMs=1638723893172, endMs=1638723898172}, Key: S, Sum: 6 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896235] >> Window: Window{startMs=1638723893182, endMs=1638723898182}, Key: S, Sum: 3 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896236] >> Window: Window{startMs=1638723890296, endMs=1638723895296}, Key: S, Sum: 14 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896237] >> Window: Window{startMs=1638723890301, endMs=1638723895301}, Key: T, Sum: 23 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896238] >> Window: Window{startMs=1638723890304, endMs=1638723895304}, Key: T, Sum: 24 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896239] >> Window: Window{startMs=1638723890563, endMs=1638723895563}, Key: A, Sum: 15 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896240] >> Window: Window{startMs=1638723890525, endMs=1638723895525}, Key: A, Sum: 20 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896240] >> Window: Window{startMs=1638723890308, endMs=1638723895308}, Key: A, Sum: 23 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896241] >> Window: Window{startMs=1638723893169, endMs=1638723898169}, Key: A, Sum: 14 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896242] >> Window: Window{startMs=1638723893159, endMs=1638723898159}, Key: A, Sum: 18 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896242] >> Window: Window{startMs=1638723895309, endMs=1638723900309}, Key: A, Sum: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896243] >> Window: Window{startMs=1638723890716, endMs=1638723895716}, Key: A, Sum: 23 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896243] >> Window: Window{startMs=1638723890719, endMs=1638723895719}, Key: T, Sum: 32 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896244] >> Window: Window{startMs=1638723890722, endMs=1638723895722}, Key: B, Sum: 16 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896244] >> Window: Window{startMs=1638723895305, endMs=1638723900305}, Key: T, Sum: 12 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896245] >> Window: Window{startMs=1638723895302, endMs=1638723900302}, Key: T, Sum: 13 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896245] >> Window: Window{startMs=1638723893179, endMs=1638723898179}, Key: T, Sum: 21 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896246] >> Window: Window{startMs=1638723893175, endMs=1638723898175}, Key: T, Sum: 23 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896246] >> Window: Window{startMs=1638723893163, endMs=1638723898163}, Key: T, Sum: 25 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896247] >> Window: Window{startMs=1638723892793, endMs=1638723897793}, Key: T, Sum: 28 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896247] >> Window: Window{startMs=1638723895720, endMs=1638723900720}, Key: T, Sum: 4 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896248] >> Window: Window{startMs=1638723890725, endMs=1638723895725}, Key: T, Sum: 36 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896248] >> Window: Window{startMs=1638723892797, endMs=1638723897797}, Key: B, Sum: 15 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896249] >> Window: Window{startMs=1638723895723, endMs=1638723900723}, Key: B, Sum: 8 [sliding-events-da26f9fa-fa9f-4229-8537-62285001e497-StreamThread-1] INFO com.polarsparc.kstreams.WindowedSum - ---> [1638723896249] >> Window: Window{startMs=1638723890728, endMs=1638723895728}, Key: B, Sum: 24
Analyzing the lines from the Output.9 above we can see the sliding window aggregation working as expected.
References
Introduction to Stream Analytics Windowing Functions
Exploring Kafka Streams :: Part 4
Exploring Kafka Streams :: Part 3
Exploring Kafka Streams :: Part 2