Live data streaming project using Kafka— part 3

Subham Kumar Sahoo
6 min readOct 8, 2022

Kafka project to stream bidding data from Python Flask website. Data is fetched using multiple consumers in parallel and stored in MySQL database. Dashboard created using Power BI tool.

Previous part : https://medium.com/@subham-sahoo/live-data-streaming-project-using-kafka-part-2-2df7ed529413

Disclaimer : If you get stuck just do not get demotivated and leave stuffs halfway. Believe me the answers to your problems lie in just one search on internet (maybe more than one 😉).

Today we will be creating Consumer applications that will add the messages to a MySQL DB and simultaneously create a backup file to store these raw messages.

GitHub repository : https://github.com/sksgit7/Big-data-projects/tree/main/kafka%20project1

Step 1 : Create a MySQL table

If you do not have MySQL workbench installed, kindly follow this video : https://www.youtube.com/watch?v=us1XyayQ6fU&t=673s

And then you can refer this too : https://www.youtube.com/watch?v=Bv2MBsq70RQ&t=353s

I guess this is for Windows OS, but you can easily get the steps for other OS too.

Now connect to the MySQL DB connection. Click on the connection and provide username and password. Then open an editor and we will create a Database and a table.

create database testuse test

Then we will create the table “bid”.

create table bid(
id int auto_increment,
name varchar(20) not null,
price int not null,
bid_ts timestamp not null,
constraint pk_id primary key (id)
)

Step 2 : Creating first consumer

Now let’s create a consumer program to get the data from the Kafka topic and write it to the DB table.

Create db_writer1.py file in “kafka project1” and put the code from GitHub repository. This code will first create a connection to Kafka-confluent topic and poll the records/messages each second. Then it will connect to the MySQL instance on local system and insert the values into Db table “bid”.

Code link : https://github.com/sksgit7/Big-data-projects/blob/main/kafka%20project1/db_writer1.py

Note : We can use other DB engines like PostgreSQL, Oracle and even Db hosted on cloud. Just we need to know the host url/endpoint as well as credentials.

Run and test the consumer application

python db_writer1.py

As we have already produced a record before and our retention time is 1 week (default), the message was there in the Kafka topic. Now the consumer has pulled it.

Let’s run the consumer and producer code simultaneously. Open two terminal windows, navigate to correct directory and activate the virtual environment. Then on one terminal run producer code “flask run” and in another run the consumer code “python db_writer.py”.

Here I have sent three entries from website.

On Kafka dashboard also we can see the published messages.

And now let’s check on the second terminal where our consumer code is running.

Here we can see 3 records that consumer application fetched. It fetched the records in order like first in first out as we have set this parameter in consumer code.

‘auto.offset.reset’: “earliest”

Note : It took fraction of a second to insert record to the MySQL table because the flask code is running in our local machine as well as the MySQL engine. But the most time it takes is while producing and consuming the message. This might be due to the fact that I configured my Kafka cluster and Schema registry to be in Ohio region (which is very far away).

Now on your SQL editor run this below query to get the records, latest records first.

select * from bid order by id desc;

Step 3 : Using multiple consumer applications

We saw above that it is taking around 5–6 seconds starting from user putting his/her bid till it is produced to Kafka topic. So, to reduce the total time for a group of entries, we can use multiple consumer applications in parallel. Then if we produce around 20 records and we have 2 consumers then each one will fetch nearly half of the records and insert to DB table. While one consumer might have taken 20*5 i.e. 100 seconds, now two consumers in parallel will take approximately 50 seconds each and as these will be running in parallel total time reduce from 100 to 0 seconds here.

Note : Generally multiple consumers in parallel not fetch same number of records. So, consumer 1 can fetch 11 records while consumer 2 can fetch rest 9 records. But together they will get all the records.

Now we will create another file db_writer2.py which will contain the same code as db_writer1.py and same “group.id” value.

Code link : https://github.com/sksgit7/Big-data-projects/blob/main/kafka%20project1/db_writer2.py

Note : If we keep this group.id value in both code different, each will fetch all the records. We will see it later.

Then we will run producer code and both consumer code in three different terminals.

I used three different browser windows to place bids within seconds. And here you can see it is showing the current highest bid too (changed the code a bit). We can see the latest highest bid on each refresh.

Also we can add code to see each distinct individual’s latest/highest bid.

Note : We can implement auto-refresh using HTML metadata or use some other techniques to update latest bid without manual intervention.

Let’s start our Flask server and run both consumer codes.

Then I have put 10 bids randomly by directly clicking on “Bid” button which has the code to randomly generate a name like Aa, Bb etc with random bids.

Producer :

Consumer 1 :

Consumer 2 :

Here we can see while consumer 1 (db_writer1.py) fetched 4 records out of 10, consmer 2 (db_writer2.py) fetched rest 6 records.

Also the web pages get modified with the highest bid.

If you check the same would be there in Kafka dashboard. Let’s check the DB table.

He we see that id=15 (Dd) has been inserted first and the latest one is id=24 (Ff). The same order we can see from the Producer run terminal.

By comparing the timestamp of the first bid (when it was submitted) with the latest timestamp among both consumers, we can see that it took around 2 seconds for 10 records. But if we have used one consumer then it would have taken 50+ seconds. Similarly we can have more consumer application running in parallel.

Next we will work on another consumer application that backs up the messages as a csv file for future use or audit purpose.

Next part : https://medium.com/@subham-sahoo/live-data-streaming-project-using-kafka-part-4-51df2ada5a5a

If you liked this project, kindly do clap. Feel free to share it with others.

👉Follow me on medium for more such interesting contents and projects.

Check out my other projects here : https://medium.com/@subham-sahoo/

Connect with me at LinkedIn. ✨

Thank you!!

References

https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

https://superuser.com/questions/283673/how-to-get-windows-command-prompt-to-display-time

--

--