After setting up the environment, it is now time to simulate the beer ratings flowing in. As explained, I will start off several generators simultaneously. To generate some (intended) data skew away from the average, several generators will have the same structural event definition, however they will be different in the combination of users, beers and ratings upper and lower bounds. Of course this is also based on my personal preference – who said my demonstration scenario should be fair?
First Gulps
Okay, I have started the generators to “randomly” generate 400 beer ratings to my Kafka topic “beer_ratings”. After sinking 400 beers, let me start up the command line interface first, to do some elementary checks. Using KSQL, the topic can be inspected using the print command:
“`print ‘beer_ratings from beginning limit 10;“`
In this simple command, there are some hard-learned lessons I discovered:
- Unquoted names are converted to UPPER case and the names are case-sensitive – so you need to quote lower case names!
- You only get to see the topic’s messages generated after the time you started the command line tool – unless you include ‘from beginning’ in your command!
- Commands will block after all data has been read until new data arrives – but you can override this by limiting the number of rows output.

Streaming
To issue actual KSQL commands against the data, we first need to define a stream based on the topic; as we have loaded the data using AVRO schema and the schema is stored in the Schema Registry, this turns out to be quite easy (but let me use UPPER CASE names instead now ..):
create stream BEERRATINGS with (kafka_topic='beer_ratings', value_format='AVRO');
KSQL will simply reply with ‘Stream created.’ and return to the prompt. If you try to query the stream directly, no data will be shown and the session will block:
Like reading the topic, the stream query will – by default – only show the newly arriving data and silently ignore the already existing events. Fortunately, CTRL-C will bring you back to the prompt and by issuing the following command, you can reset the offset which is being kept for your client to the earliest point in time possible automatically:
set 'auto.offset.reset'='earliest';
Repeating the (K)SQL command, preferrably with a LIMIT clause to restrict the number of rows returned, now shows all events already present in the topic:

Exploring the Stream
One of the slight annoyances I had with KSQL is that it does not show the column names when querying. For small events, this is not really an issue, but for more complex payloads or for streams you have created yourself, or a longer time ago, this may be a problem as you’ll need the column names for your queries.
Fortunately, KSQL’s build in DESCRIBE command will show you exactly what is present in the stream:
Simple Queries
The command language used in Confluent’s KSQL has a lot in common with regular SQL.
Projection and filtering
Projection and filtering are easily achieved using KSQL, e.g. listing all beers that mnuman has consumed with their rating:
SELECT beer_id
, rating
FROM beerratings
WHERE user_id = 'mnuman';

Simple Aggregation
Keeping the beer counts and their average score – unfortunately, KSQL does not have an “average” function built-in, so we need to roll our own:
SELECT beer_id
, count(*)
, sum(rating)/count(*) as average_rating
FROM beerratings
GROUP BY beer_id
;

The interesting thing is that also this query is blocking; as the event stream has been halted, no aggregates need to be recomputed … but as soon as we generate some additional events, the newly computed results start flowing in (our out):


CSAS
Another useful feature is being able to define a stream based on another stream, allowing a subset of filtered and/or enriched data to be created for specific consumers. Just like in SQL, where you can perform a “create-table-as-select” command, KSQL offers CSAS: create stream as select!
Let’s suppose we want to filter out beer ratings pertaining to our SynBeers, we can easily achieve this using:
CREATE STREAM SYNBEERSTREAM AS
SELECT *
FROM beerratings
where (beer_id = 'PLATINUMBLOND' OR
beer_id = 'WEIZENDOPPELBOCK' OR
beer_id = 'OAKEDQUAD'
)
;
And of course, this stream can also be queried using the same commands. So if our company directors, Johan and Hugo, find the refrigerator out of SynBier, they can simply issue a command to find out who has the most checkins and is the likely one to blame. As KSQL does not have an ORDER BY clause (it’s streaming data, right?!), we can achieve our goal by using some simple UNIX tools if we put the query into a file (last_beer.ksql):
set 'auto.offset.reset'='earliest';
SELECT user_id,beer_id,timestamptostring(max(beer_time)*1000, 'yyyy-MM-dd.HH:mm')
FROM synbeerstream
group by user_id , beer_id
;
Good old UNIX data mangling, sending the query through KSQL, forcing it to timeout after 5 seconds and filtering the output for just the data rows, sorting them on the third field in reverse order:
cat last_beer.ksql \
| ksql --query-timeout 5000 http://localhost:8088 2>&1 \
| grep -v ksql | grep '|' | sort -t'|' -k 3 -r

Mea Maxima Culpa
Oops, I must have pulled an all-nighter as I was both responsible for drinking (or at least checking in) the last Oaked Quadrupel at 7 o’clock as well as the last Platinum Blond at 3 in the morning …
Joining
Just as SQL is no fun with just a single table, the same applies to KSQL. So let’s join tables together!
As you may recall, in the second installment we have loaded some data into a topic ‘beers’, now it is time to convert the data in this specific topic to … a table. The difference between a stream and a table is that a table will hold only a single row per key (viz. the latest one to arrive), whereas the stream is the unbounded data.
Defining a objects can also be done quite handsomely in the Confluent Control Center:


Introducing the table opens up the data to all kinds of interesting manipulations, i.e. who has consumed the most (in terms of alcohol)?
SELECT br.user_id
, ROUND(SUM(b.abv * b.volume))/100.0 as alcohol_volume
, ROUND(SUM(b.volume)) as beer_volume
, COUNT(*) as number_of_beers
FROM beerratings br
INNER JOIN BEERS b on br.beer_id = b.id
GROUP BY br.user_id
;
Leading in all categories and guilty as charged …

Conclusion
I hope that I have been able to convince you that processing streaming data is a piece of cake using Kafka and the Confluent Platform. Especially when using KSQL the manipulation of data becomes almost as easy as writing SQL …