Blog

Kafka Streaming – Setting Up

Introduction

In this blog, I am going to zoom into KSQL and the opportunities it offers for manipulating streaming data in Kafka, by merely using SQL-like statements. One of the neat things about the Confluent Kafka platform, is that it provides additional utilities on top of the core Kafka tools. One of these utilities is the ksql-datagen, which allows users to generate random data based on a simple schema definition in Apache Avro.

First things first

I have run the examples in a simple Virtual Box environment with 16 GB of memory and 4 CPU cores, running CentOS and Confluent Platform 5.3. Using the confluent command line tool, I’ve fired up the Confluent platform to run a single Zookeeper, a single Kafka broker, the Schema registry and the KSQL server by running:

confluent local start ksql-server

After starting the cluster, I also made sure to start the confluent control-center (which also starts Kafka Connect).

Using the Confluent Control Center (CCC), new topics are easily created; in this case, I created a new simple topic beer_ratings with only a single partition:

Creating a topic in a simple way

Defining a schema

Before we can test the data generation using the ksql-datagen, we first need to think about what elements need to be present in our beer_rating events:

  • beer_rating_id – unique identifier as the key of the message
  • timestamp when the ‘check in’ was made
  • user_id – identification of the beer afficionado
  • beer_id – beer identification
  • rating – a simple score between 0 and 10 points

Using some examples and the AVRO documentation, I have arrived at the following schema to generate some data:

{
  "namespace": "streams",
  "name": "beer_ratings_v1",
  "type": "record",
  "fields": [
    {
      "name": "beer_time",
      "type": {
        "type": "long",
        "arg.properties": {
          "iteration": {
            "start": 1540000000,
            "step": 611
          }
        }
      }
    },
    {
      "name": "beer_rating_id",
      "type": {
        "type": "long",
        "arg.properties": {
          "range": {
            "min": 1
          }
        }
      }
    },
    {
      "name": "user_id",
      "type": {
        "type": "string",
        "arg.properties": {
          "options": [
            "haantje",
            "dadario",
            "allesklarherrkommisar",
            "sjonnie_j",
            "roosjee_pee_es_vee"
          ]
        }
      }
    },
    {
      "name": "beer_id",
      "type": {
        "type": "string",
        "arg.properties": {
          "options": [
            "MOOSE",
            "HOP",
            "IJWIT",
            "WITTEROOK",
            "HOUTGERIJPTEDOPPELBOCK",
            "PLATINUMBLOND",
            "MOOIENEL",
            "PALEALECITRA",
            "UP",
            "F*CKCARAVAN",
            "WEIZENDOPPELBOCK",
            "OAKEDQUAD",
            "BAHVARIA"
          ]
        }
      }
    },
    {
      "name": "rating",
      "type": {
        "type": "int",
        "arg.properties": {
          "range": {
            "min": 1,
            "max": 10
          }
        }
      }
    }
  ]
}

I am generating a timestamp starting at 1,540,000,000 seconds after the UNIX epoch, which is on October 20th, 2018, increasing the timestamp in steps of 611 (seconds). The userid will be picked from the supplied list, as well as the beer being rated. The rating will be somewhere between 1 and 10, and the beer_rating_id will be increasing from 1.

Skewing the distribution

Now, I could be running a single instance of the generator … but this would be responsible for – ultimately – an even distribution of numbers of beers, number of beers per person and the average score for each beer would eventually be expected to be 5.5 as well, according to the Law Of Large Numbers. This is illustrated in the diagram below:

By Pred - Own work, CC0, https://commons.wikimedia.org/w/index.php?curid=58536069

Clearly, that is no fun!

So I decided to skew the distribution, by running a couple of generators simultaneously, using a number of variations of the AVRO schema described in the previous section. The thing to keep in mind is that I need to make sure that the generators use non-overlapping sections of beer_rating_ids, as I would like to be able to uniquely identify the event.

Running ksql-datagen

The Confluent supplied ksql-datagen is dead easy to use:

$ ksql-datagen
Schema file not provided
usage: DataGen 
[help] 
[bootstrap-server=<kafka bootstrap server(s)> (defaults to localhost:9092)] 
[quickstart=<quickstart preset> (case-insensitive; one of 'orders', 'users', or 'pageviews')] 
schema=<avro schema file> 
[schemaRegistryUrl=<url for Confluent Schema Registry> (defaults to http://localhost:8081)] 
format=<message format> (case-insensitive; one of 'avro', 'json', or 'delimited') 
topic=<kafka topic name> 
key=<name of key column> 
[iterations=<number of rows> (defaults to 1,000,000)] 
[maxInterval=<Max time in ms between rows> (defaults to 500)] 
[propertiesFile=<file specifying Kafka client properties>]

So, to publish 10 messages in AVRO format to our beer_ratings_topic, using the beer_rating_id column as the key, we can accept the defaults for now where applicable and simply issue:

$ ksql-datagen schema=./my-beer-rating.avsc topic=beer_ratings format=avro key=beer_rating_id iterations=10
[ ... kafka configuration options removed ...]
5644674801274443777 --> ([ 1540000000 | 5644674801274443777 | 'maniak' | 'PLATINUMBLOND' | 96 ]) ts:1569093882123
7765804266723001345 --> ([ 1540000611 | 7765804266723001345 | 'roosjee_pee_es_vee' | 'PALEALECITRA' | 64 ]) ts:1569093882611
8913643721826930689 --> ([ 1540001222 | 8913643721826930689 | 'haantje' | 'BAHVARIA' | 84 ]) ts:1569093882760
4394629606897394689 --> ([ 1540001833 | 4394629606897394689 | 'dadario' | 'PALEALECITRA' | 33 ]) ts:1569093882881
3523897602695294977 --> ([ 1540002444 | 3523897602695294977 | 'allesklarherrkommisar' | 'OAKEDQUAD' | 15 ]) ts:1569093882915
4948118222439247873 --> ([ 1540003055 | 4948118222439247873 | 'sjonnie_j' | 'IJWIT' | 37 ]) ts:1569093883255
6970167734292771841 --> ([ 1540003666 | 6970167734292771841 | 'allesklarherrkommisar' | 'WITTEROOK' | 37 ]) ts:1569093883471
4555820884575924225 --> ([ 1540004277 | 4555820884575924225 | 'sjonnie_j' | 'HOUTGERIJPTEDOPPELBOCK' | 42 ]) ts:1569093883835
5112038266640616449 --> ([ 1540004888 | 5112038266640616449 | 'dadario' | 'WITTEROOK' | 21 ]) ts:1569093883885
6138800835128937473 --> ([ 1540005499 | 6138800835128937473 | 'dadario' | 'MOOSE' | 72 ]) ts:1569093884244

As the rows are generated, they’re echoed to the terminal as:
key –> ([ timestamp | beer_rating_id | user_id | beer_id | rating)

Static Reference Data

Just having data stream into the system can be interesting if the data itself is sufficiently self-explaining; as you can see from the data above, it does not really tell you anything about the beer lovers nor any details about the beer itself.

So perhaps this is a good moment to provide some details on the different beers that are being rated, and how can this be better done than by sending messages to a Kafka topic?

Cooking up some data

I have cooked up some reference data for the beers  in a text file, specifying the message key and the value separated by a # sign:

MOOSE#{ "ID" : "MOOSE", "NAME" : "Moose On The Loose","BREWER" : "UILTJE", "BEERTYPE" : "NEIPA","ABV": 6.0,"VOLUME" : 0.44}
HOP#{ "ID" : "HOP", "NAME" : "Hop Zij Met Ons","BREWER" : "JOPEN", "BEERTYPE" : "IPA","ABV": 6.0,"VOLUME" : 0.30}
IJWIT#{ "ID" : "IJWIT", "NAME" : "IJwit","BREWER" : "IJ", "BEERTYPE" : "WITBIER","ABV": 6.5,"VOLUME" : 0.33}
WITTEROOK#{ "ID" : "WITTEROOK", "NAME" : "Witte Rook","BREWER" : "JOPEN", "BEERTYPE" : "SMOKED","ABV": 7.0,"VOLUME" : 0.25}
HOUTGERIJPTEDOPPELBOCK#{ "ID" : "HOUTGERIJPTEDOPPELBOCK", "NAME" : "Houtgerijpte Rook Doppelbock","BREWER" : "DUITSLAURET", "BEERTYPE" : "SMOKED","ABV": 7.5,"VOLUME" : 0.30}
PLATINUMBLOND#{ "ID" : "PLATINUMBLOND", "NAME" : "Platinum Blond","BREWER" : "SYNBIERBREWLAB", "BEERTYPE" : "BLONDEALE","ABV": 6.0,"VOLUME" : 0.33}
MOOIENEL#{ "ID" : "MOOIENEL", "NAME" : "Mooie Nel IPA","BREWER" : "JOPEN", "BEERTYPE" : "IPA","ABV": 7.0,"VOLUME" : 0.33}
PALEALECITRA#{ "ID" : "PALEALECITRA", "NAME" : "Pale Ale Citra","BREWER" : "BROUWERIJKEES", "BEERTYPE" : "PALEALE","ABV": 4.6,"VOLUME" : 0.4}
UP#{ "ID" : "UP", "NAME" : "Urtyp","BREWER" : "BRAND", "BEERTYPE" : "PILSNER","ABV": 5.5,"VOLUME" : 0.3}
F*CKCARAVAN#{ "ID" : "F*CKCARAVAN", "NAME" : "F*ck The Caravan Is On Fire!","BREWER" : "UILTJE", "BEERTYPE" : "PALEALE","ABV": 5.5,"VOLUME" : 0.44}
WEIZENDOPPELBOCK#{ "ID" : "WEIZENDOPPELBOCK", "NAME" : "Weizen Doppelbock","BREWER" : "SYNBIERBREWLAB", "BEERTYPE" : "WEIZENDOPPELBOCK","ABV": 7.3,"VOLUME" : 0.3}
OAKEDQUAD#{ "ID" : "OAKEDQUAD", "NAME" : "Oaked Quadrupel","BREWER" : "SYNBIERBREWLAB", "BEERTYPE" : "QUADRUPEL","ABV": 9.0,"VOLUME" : 0.75}
BAHVARIA#{ "ID" : "BAHVARIA", "NAME" : "Bah!Varia","BREWER" : "SWINKELS", "BEERTYPE" : "BOCHT","ABV": 5.0,"VOLUME" : 0.20}

Beers have properties for ID, name, brewer, beer type or style, ABV (alcohol percentage by volume) and the volume in litres.

Using this concocted format, the data can be easily loaded into a topic beers created beforehand, by piping the file contents through the supplied kafka-console-producers:

cat ./beers.json | \
  kafka-console-producer --property "parse.key=true" \
    --property "key.separator=#" --broker-list :9092 --topic beers

Using the Kafka Control Center, we can easily verify that the beer events are indeed loaded into the beers topic:

Time for a beer

As it is Saturday night, almost 10 o’clock, it is time for a beer now … as we have set up our generators (more or less) and loaded the reference data into Kafka as well, I promise the next installment will really be about querying the data using KSQL!

Does this give away my preferences?

Milco NumanKafka Streaming – Setting Up

Related Posts