David RomeroSoftware EngineerFor several years I have a hard goal. I want to become a great software engineer, so I work hard developing quality software every day. Fan of new technologies. I will not stop until overcome my goals.
Last September, my coworker Iván Gutiérrez and me, spoke to our cowokers how to implement Event sourcing with Kafka and in this talk, I developed a demo with the goal of strengthen the theoretical concepts. In this demo, I developed a Kafka Stream that reads the tweets containing “Java” word from Twitter, group tweets by username and select the tweet with the most likes. The pipeline ends sending the recolected information to PostgreSQL
As we have received positive feedback and we have learned a lot of things, I want to share this demo in order to it will be available to anyone who wants to take a look.
The demo is an implementaiton of the CQRS pattern based on Kafka and Kafka Streams. As we can see in the main image, Kafka is able of decoupling read (Query) and write (Command) operations, which helps us to develop event sourcing applications faster.
The Stack
The whole stack has been implemented in Docker for its simplicity when integrating several tools and for its isolation level.
The stack is composed of
Kafka: The main actor. You need to set zookeeper ip. You can see all proficiencies in the provided slides above.
Kafka Connector: One of the 4 main Kafka core API. It’s in charge of reading records of a provided topic and inserting them into PostgreSQL.
PostgreSQL: SQL Database.
Producer
It’s the writer app. This piece of our infrastructure is in charge of read the tweets containing “Java” word from Twitter and send them to Kafka.
The following code has two sections: Twitter Stream and Kafka Producer.
Twitter Stream: Create a data stream of tweets. You can add a FilterQuery if you want to filter the stream before consuption. You need credentials for accesing to Twitter API.
Kafka Producer: It sends records to Kafka. In our demo, it sends records without key to the ‘tweets’ topic.
This app is a Spring Boot application.
Kafka Stream
The main piece of our infrastructure is in charge of read the tweets from ‘tweets’ topics, group them by username, count tweets, extract the most liked tweet and send them to the ‘influencers’ topic.
Let’s focus on the two most important methods of the next block of code:
stream method: Kafka Stream Java API follow the same nomenclature that the Java 8 Stream API. The first operation performed in the pipeline is to select the key since each time the key changes, a re-partition operation is performed in the topic. So, we should change the key as less as possible. Then, we have to calculate the tweet that most likes has accumulated. and as this operation is a statefull operation, we need to perform an aggregation. The aggregation operation will be detailed in the following item. Finally, we need to send the records to the output topic called ‘influencers’. For this task we need to map Influencer class to InfluencerJsonSchema class and then, use to method. InfluencerJsonSchema class will be explained in Kafka Connector section. Peek method is used for debugging purposes.
aggregateInfoToInfluencer method: This is a statefull operation. Receives three args: the username, the raw tweet from the topic and the previous stored Influencer. Add one to the tweet counter and compare the likes with the tweet that more likes had. Returns a new instance of the Influecer class in order to mantain immutability.
@EnableKafkaStreams annotation and kStreamsConfigs method are the responsible of integrate the Kafka Stream API with Spring Framework.
Further information of this integration is provided here
In the above block of code is mentioned Influencer class and in order to facilitate the read, the code of Influencer class is provided here:
fromJson method is mandatory due to serialization proccess used by Kafka Stream. See Kafka Stream Serde if you want to know more about this topic.
This app is a Spring Boot application.
Kafka Connector
Once we have fed our topic ‘influencers’, we have to persist the data to Postgre. For this task, Kafka provide a powerful API called Kafka Connect. Confluent, the company created by Apache Kafka’s developers, has developed several connectors for many third-party tools. For JDBC, exits two connectors: source and sink. Source connectors reads data from jdbc drivers and send data to Kafka. Sink connectors reads data from Kafka and send it to jdbc driver.
We are going to use a JDBC Sink connector and this connector needs the schema information in order to map topic records into sql records. In our demo, the schema is provided in the topic record. For that reason, we have to map from Influecer class to InfluencerJsonSchema class in our data pipeline.
In the following code, you can see how the schema will be sent. If you want to see how is the result in json format you can see the provided gist.
Then, we need to configure our Kafka connector. Source topic, destination table, primary key or url connection should be provided.
Special mention for the field ‘insert.mode’. We use ‘upsert’ mode due to the primary key is the username so the records will be inserted or updated depending on whether the user has been persited before or not.
The above json code has been stored into a file in order to have a follow-up of it
Once we have developed the connector, we have to add the connector to our Kafka Connector container and this can be performed with a simple curl.
foo@bar:~$curl -i-X POST -H"Accept:application/json"-H"Content-Type:application/json" http://localhost:8083/connectors/ -d @connect-plugins/jdbc-sink.json
Reader App
We have developed a simple Spring Boot applications to read inserted records in Postgre. This app is very simple and the code will be skipped of this post since it does not matter.
Attached a screenshot of the UI in order to view the results.
If you want to see the code, is available in Github
3. How to run
If you want to run the demo you have to execute the following commands.
foo@bar:~$docker-compose up
foo@bar:~$curl -i-X POST -H"Accept:application/json"-H"Content-Type:application/json" http://localhost:8083/connectors/ -d @connect-plugins/jdbc-sink.json
foo@bar:~$mvn clean spring-boot:run -pl producer
foo@bar:~$mvn clean spring-boot:run -pl consumer
foo@bar:~$mvn clean spring-boot:run -pl reader
4. Conclusion
This demo show us a great example of a CQRS implementation and how easy it’s to implement this pattern with Kafka.
In my humble opinion, Kafka Stream is the most powerful API of Kafka since provide a simple API with awesome features that abstracts you from all the necessary implementations to consume records from Kafka and allows you to focus on developing robust pipelines for managing large data flows.
Besides, Spring Framework provides an extra layer of abstraction that allow us to integrate Kafka with Spring Boot applications.
The full source code for this article is available over on GitHub.