top of page
  • Writer's pictureergemp

Load Testing a Kafka Cluster

You installed and configured a brand new Kafka Cluster and now it is time to test the cluster to know its boundaries. It comes to which tools do I have and how am I going to generate some random data. The parts of this post is as follows:


1- Generating some random test data.

2- Create a topic to test on.

3- Create a Kafka Streams job to continuously printout the statistics of the topic.

4- Load the data with kafka-producer-perf-test.sh tool.


1- Generating some random test data


For especially synthetic test data, there are many ways to generate some test data. Even kafka-producer-perf-test.sh can generate some test data as well.


Or, you can generate some random test data by using bash as well.


ergemp@Ergems-MacBook-Pro kafka-241 % for i in {1..10}
for> do
for> str=`base64 /dev/urandom | head -c 10`
for> echo $str
for> done
MU1ahM48ph
+RIKplmmoj
0GyrBIyjyW
iWkJdNduDz
Ji4qGVJKxs
GA/ztVx+yb
10KCehS/cT
OmQjH8YF79
9jqvHZm2Zm
XHdFbcjA1V
ergemp@Ergems-MacBook-Pro kafka-241 %

But in this post I am going to use mockaroo (https://www.mockaroo.com/) which is a test data generation tool I am using for years now. Since the load test is a synthetic approach, I at least prefer the data should resemble the real world data. So that I generated some clickstream data set which has a 10.000 (ten thousand) lines. The sample of this data is as follows.


{"sid":"b2e4fe46-d1aa-4852-ab11-4c33c7707892","pid":"4e2f2ba3-619e-4f49-be74-885c88bad904","userId":null,"ts":"1505266352","event":"orderSummary","productId":"000042","ip_address":"171.196.0.210","productList":[{"productIds":"000025"}]}
{"sid":"99b2dd84-9954-4468-b75b-f5ebe3be8b21","pid":"ee7bb9d2-0814-49c8-942d-3c94013d2dac","userId":null,"ts":"1501666846","event":"productView","productId":"000028","ip_address":"172.174.181.75","productList":[{"productIds":"000030"},{"productIds":"000022"},{"productIds":"000040"}]}
{"sid":"1052c8a4-bd9f-4f10-b678-6724539e1225","pid":"8887b224-24ea-471d-a4fa-12496eb13a16","userId":"eollander4fe","ts":"1517907707","event":"newSession","productId":"000044","ip_address":"127.71.222.114","productList":[{"productIds":"000039"},{"productIds":"000015"},{"productIds":"000017"},{"productIds":"000036"},{"productIds":"000036"}]}
{"sid":"b2e4fe46-d1aa-4852-ab11-4c33c7707892","pid":"4e2f2ba3-619e-4f49-be74-885c88bad904","userId":null,"ts":"1505266352","event":"orderSummary","productId":"000042","ip_address":"171.196.0.210","productList":[{"productIds":"000025"}]}

Here is the definition of my random data.



2- Create a topic to test on


It is important to generate a topic before hand. So that the configuration of the topic should resemble the real world parameters in order to test the results of the performance test itself. Here I am using my local laptop so that the partitions and replication-factor that I am going to set is 1.


ergemp@Ergems-MacBook-Pro kafka-241 % bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic perf-test --replication-factor 1 --partitions 1
Created topic perf-test.

ergemp@Ergems-MacBook-Pro kafka-241 % bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list
__consumer_offsets
connect-configs
connect-local-file
connect-offsets
connect-status
first-topic
firstTopic
jdbc-local-postgres-test01-categories
perf-test
ergemp@Ergems-MacBook-Pro kafka-241 %


3- Create a Kafka Streams job to continuously printout the statistics of the topic


Since kafka-producer-perf-test.sh prints the metrics, this step is optional. But as I have logical test data to load, I can try to produce some meaningful output running in realtime while I am loading my test data. Here is the simple Kafka Streams job counts the events of the sample Click-Stream data for every 30 seconds.


package toolkit.kafka.streams;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;

import java.io.IOException;
import java.time.Duration;
import java.util.Properties;

public class AnalyticsStreamJob {
    public static void main(String[] args){
        System.out.println("Main: creating properties with the supplied parameters");
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "AnalyticsStreamJob-01");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        System.out.println("Main: properties created");
        System.out.println("------------------------");
        System.out.println(props.toString());
        System.out.println("------------------------");

        System.out.println("Main: creating stream topology");
        final StreamsBuilder builder = new StreamsBuilder();

        Duration windowSizeMs = Duration.ofSeconds(10);
        Duration gracePeriodMs = Duration.ofSeconds(10);
        //TimeWindows.of(windowSizeMs).grace(gracePeriodMs);
        // The above is equivalent to the following code:
        //TimeWindows.of(windowSizeMs).advanceBy(windowSizeMs).grace(gracePeriodMs);

        KStream<String, String> source = builder.stream("perf-test");
        ObjectMapper objectMapper = new ObjectMapper();

        //KStream<String, String> events = source
            source
                .map( (mkey, mvalue) -> {
                    try {
                        JsonNode jsonNode = objectMapper.readTree(mvalue);
                        JsonNode eventNode = jsonNode.get("event");

                        if (eventNode != null) {
                            return new KeyValue<String, String>(eventNode.textValue(), "");
                        }
                        else {
                            return new KeyValue<String, String>("null","");
                        }
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                        return new KeyValue<String, String>("error", "");
                    } catch (IOException e) {
                        e.printStackTrace();
                        return new KeyValue<String, String>("error", "");
                    }
                    //System.out.println(mvalue);
                })
                .groupByKey()
                .windowedBy(TimeWindows.of(windowSizeMs).advanceBy(windowSizeMs).grace(gracePeriodMs))
                .count()
            .toStream()
            .foreach((key,value) -> System.out.println(" key : " + key + " value : " + value ));
            ;

        final Topology topology = builder.build();
        System.out.println("Main: toplogy created");
        System.out.println("---------------------");
        System.out.println(topology.describe());
        System.out.println("---------------------");

        System.out.println("Main: starting stream...");
        final KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    }
}

4- Load the data with kafka-producer-perf-test.sh tool


Now I am going to load the test data. I have produced 10.000 rows of sample Click-Stream Data as json and going to load 10 rows every second. I will also run the Streaming job and a kafka-console-consumer.sh to monitor the flowing data to the perf-test topic.

ergemp@Ergems-MacBook-Pro kafka-241 % bin/kafka-producer-perf-test.sh \
--payload-file mockdata/clickStream.json \
--num-records 10000 \
--throughput 10 \
--topic perf-test \
--producer-props acks=all bootstrap.servers=localhost:9092 \
--print-metrics
Reading payloads from: /usr/local/kafka-241/mockdata/clickStream.json
Number of messages read: 10000
501 records sent, 100.2 records/sec (0.03 MB/sec), 4.8 ms avg latency, 262.0 ms max latency.
501 records sent, 100.0 records/sec (0.03 MB/sec), 2.3 ms avg latency, 5.0 ms max latency.
501 records sent, 100.1 records/sec (0.03 MB/sec), 1.9 ms avg latency, 4.0 ms max latency.

And the output of the Kafka Streams job is as follows:


 key : [newSession@1614593650000/1614593660000] value : 7
 key : [productView@1614593650000/1614593660000] value : 8
 key : [orderSummary@1614593650000/1614593660000] value : 9
 key : [boutiqueView@1614593650000/1614593660000] value : 6
 key : [orderSummary@1614593660000/1614593670000] value : 33
 key : [productView@1614593660000/1614593670000] value : 24
 key : [newSession@1614593660000/1614593670000] value : 19
 key : [boutiqueView@1614593660000/1614593670000] value : 24
 key : [newSession@1614593670000/1614593680000] value : 23
 key : [boutiqueView@1614593670000/1614593680000] value : 25
 key : [productView@1614593670000/1614593680000] value : 29
 key : [orderSummary@1614593670000/1614593680000] value : 23
 key : [orderSummary@1614593680000/1614593690000] value : 28
 key : [boutiqueView@1614593680000/1614593690000] value : 33
 key : [productView@1614593680000/1614593690000] value : 22
 key : [newSession@1614593680000/1614593690000] value : 17
 key : [orderSummary@1614593690000/1614593700000] value : 3
 key : [newSession@1614593690000/1614593700000] value : 5
 key : [boutiqueView@1614593690000/1614593700000] value : 2
 key : [productView@1614593690000/1614593700000] value : 12
 key : [newSession@1614593960000/1614593970000] value : 11
 key : [boutiqueView@1614593960000/1614593970000] value : 12
 key : [orderSummary@1614593960000/1614593970000] value : 21
 key : [productView@1614593960000/1614593970000] value : 15
 key : [newSession@1614593970000/1614593980000] value : 25
 key : [productView@1614593970000/1614593980000] value : 23
 key : [orderSummary@1614593970000/1614593980000] value : 27
 key : [boutiqueView@1614593970000/1614593980000] value : 25
 key : [productView@1614593980000/1614593990000] value : 12
 key : [boutiqueView@1614593980000/1614593990000] value : 12
 key : [orderSummary@1614593980000/1614593990000] value : 10
 key : [newSession@1614593980000/1614593990000] value : 7

As this is my personal laptop computer, throughput value of the perf test is 10 which means 10 records per second. For larger installations throughput value should be increased and for even bette results, perf test should be done on a separate computer.






77 views0 comments

Comments


bottom of page