Live data streaming project using Kafka— part 2
Kafka project to stream bidding data from Python Flask website. Data is fetched using multiple consumers in parallel and stored in MySQL database. Dashboard created using Power BI tool.
Previous part : https://medium.com/@subham-sahoo/live-data-streaming-project-using-kafka-part-1-9e7553c70b1
Hands-on time!!
Disclaimer : If you get stuck just do not get demotivated and leave stuffs halfway. Believe me the answers to your problems lie in just one search on internet (maybe more than one 😉).
GitHub repository : https://github.com/sksgit7/Big-data-projects/tree/main/kafka%20project1
Step 1 : Setting up Confluent Kafka account
Go to this link https://confluent.cloud/. Put your details and click on submit.
Note : In free-tier account, we will be getting $400 credits for 30 days. And we will only be able to configure basic stuffs only.
Then on the next pages you can add or skip additional details.
Create cluster
Then on the Cluster page choose the “Basic” config.
Then among the cloud service providers, either choose anyone between AWS, GCP or Azure. Else keep default.
Note : You can choose the region as the nearest one available to your location. This will lead to less time while producing and consuming messages. Actually I have went ahead with defaults, so it is taking some time in my case.
Then skip the payment details.
Provide a “cluster name” (I mentioned demo-kafka-cluster in place of cluster_0). Check other details and launch the cluster.
The “default” environment is created.
Then click on the cluster name (demo-kafka-cluster) on top-left to go into cluster.
You will see this page now.
Create API keys and Schema Registry keys
Click on the Data Integration > API keys and create key. This we will use to connect to our topics inside the cluster.
Choose Global access and Next.
Put a description and click on download. You will get the key, secret and the bootstrap server link.
Access Schema Registry
We need to setup our Schema Registry to run on cloud. Where the schema and the metadata will be stored.
On your Environments page, click on “Enable now” under Stream governance.
Click on “Begin configuration” under Essentials (Free).
Choose the Cloud provider and select a region (nearby locations are better).
Then choose enable.
Now you can see the endpoint url on Environments page.
Copy it. Under this there will be an “Add key” button to create keys for Schema registry.
Click on “Create key” and download these keys too.
Step 2 : Create a Kafka topic
Click on “Topics” on left-nav bar and Add topic.
Give a name to topic (I have given “auction”). Choose number of partitions.
Then choose clean-up policy as delete and retention as 1 week (default). Means each message will be deleted after 1 week of getting added to the topic. These configurations can be changed later.
The click on “Save and Create”.
Note : If you want to delete the messages in the topic, one of the easy way is to select the topic and click on “configurations”.
Then click on “Edit settings”, scroll down and click on “expert mode”. There you can mention the retention time as one minute (60000 ms) and save it. After one minute all messages will be deleted and then revert back the retention time.
Here also we have option to delete the topic.
Adding schema to the topic
Go to Schema tab. Click on “Value” and create schema.
Select the format as JSON and add the following schema.
{
“$id”: “http://example.com/myURI.schema.json",
“$schema”: “http://json-schema.org/draft-07/schema#",
“additionalProperties”: false,
“description”: “Sample schema to help you get started.”,
“properties”: {
“bid_ts”: {
“description”: “The type(v) type is used.”,
“type”: “string”
},
“name”: {
“description”: “The type(v) type is used.”,
“type”: “string”
},
“price”: {
“description”: “The type(v) type is used.”,
“type”: “number”
},
“product”: {
“description”: “The type(v) type is used.”,
“type”: “string”
}
},
“title”: “SampleRecord”,
“type”: “object”
}
So, here I have mentioned the column names and respective data types. The order of the columns can be different.
Then click on “Validate” and “Save”.
When we will be producing messages to Kafka topic we will be adding a randomly generated key (string) with the message. Based on the keys only Kafka decides to put a particular message in a specific partition.
Now we will add just the data type of our Message keys. Under Schema tab, select Keys. Select JSON and put “string” only.
Validate and save the schema.
Step 3 : Local setup
Final directory structure :
Big-data-projects\
|--kafka project1\
| |--templates\
| |--app.py
| |--db_writer1.py
| |--db_writer2.py
| |--file_writer.py
| |--config.py
|
|--venv-kafka-project1\
First install Python 3 in your local machine.
Create a folder “Big-data-projects”. Inside that create a virtual environment.
python -m venv venv-kafka-project-1
This will create a virtual environment folder named “venv-kafka-project-1” in current directory.
Open command prompt, navigate to “Big-data-projects” directory. Then activate the virtual environment using below command.
.\venv-kafka-project-1\Scripts\activate
I am doing it in Windows, for Linux and MacOS, the command can be different.
Now the cmd will look like this :
Then use these below commands one at a time to install few packages.
pip3 install Flaskpip3 install requestspip3 install mysql-connector-pythonpip3 install confluent_kafka
Step 4 : Create Producer application
Create config.py file in “kafka project1” directory. This file will act as a module which we will import from Producer and Consumer code.
config.py : https://github.com/sksgit7/Big-data-projects/blob/main/kafka%20project1/config.py
This file will contain our host urls and secrets which you can update accordingly from the files that we downloaded. The security protocol and SSL mechanism has been kept as default values.
Create app.py file under “kafka project1” directory. This will be our Producer code that will fetch the data from our flask application and push the messages to Kafka topic.
Add the code from GitHub repository. The code explanation has been added as comments.
app.py : https://github.com/sksgit7/Big-data-projects/blob/main/kafka%20project1/app.py
Then create a templates folder in same directory which will contain our HTML code (index.html). You can get the code from GitHub repo too.
Run and test producer code
Then while being in the “kafka project1” directory on terminal, execute:
flask run
Then go the the highlighted url on your browser to view the web page.
Put a name and a price and click on “Bid”.
After few seconds, the page will reload and you will see a message “Bid added!”.
On the terminal you can see the values in form of a dictionary (like JSON format) and this will be published to Kafka topic.
Also it mentioned that the message has been produced to partition 1 of the topic at offset 40.
Note : Like mentioned above, the message can be produced to any partition based on the key value of the message. And the algorithm that decides this is complex enough to avoid message skewness (when number of messages across partitions are not balanced) across partitions as well as it makes difficult to guess which partition the message will be produced.
Note : Actually I had some messages in this topic before which I deleted later. Due to that the offset it started with is 40. But if you are adding messages for first time in a partition, it will start from offset 0.
Now let’s go to Kafka dashboard to check the message. Click on the “auction” topic and go to “Messages” tab. Here we can see our messages.
We need to select the offset and partition if messages not load automatically. Select Jump to offset and put 40 and select partition 1.
In this way we are able to successfully produce a message to the topic.
In the next iteration we will be creating Kafka consumer applications to fetch the data from the topic and add it to the MySQL table.
Next part : https://medium.com/@subham-sahoo/live-data-streaming-project-using-kafka-part-3-91dda2856aa0
If you liked this project, kindly do clap. Feel free to share it with others.
👉Follow me on medium for more such interesting contents and projects.
Check out my other projects here : https://medium.com/@subham-sahoo/
Connect with me at LinkedIn. ✨
Thank you!!
References
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
https://www.w3schools.com/html/tryit.asp?filename=tryhtml_form_submit
https://superuser.com/questions/283673/how-to-get-windows-command-prompt-to-display-time