Data pipeline using Kafka, Hive, Python and Power BI

Subham Kumar Sahoo
12 min readOct 22, 2022

Kafka project to stream data from Python Flask website. Data is fetched using Kafka consumer (Python) and stored in Hive table. Dashboard created using Power BI tool.

Architecture diagram

GitHub repository : https://github.com/sksgit7/Big-data-projects/tree/main/kafka%20project1

Disclaimer

This mini-project is like an extension to our previous project i.e.

💡Live data streaming project using Kafka💡

I will be just mentioning the important parts and the new components. To get a better understanding of the concepts and working kindly refer to my previous four parts series on Kafka. I will strongly suggest to at-least have a quick look at these blog pages.

What are we going to build ??

We will build an end-to-end pipeline that will fetch the bid submissions from a website and insert into a DB table. And there will be a Power BI dashboard over it.

There will be 5 components like

  • Flask web application (producer) that will fetch the data from a web form and push the records to Kafka topic on Cloud.
  • Kafka topic (on Confluent-Kafka cloud).
  • Consumer (hive_writer.py) will insert records into Hive table.
  • Hive table to store the bidding data.
  • Power BI dashboard to visualize our data.

🗂️Hive

Apache Hive is a data warehouse software project built on top of Apache Hadoop for providing data query and analysis. Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop.

The data we store in a hive table get stored as files in HDFS (Hadoop Distributed File System). It is actually not a DB but just an engine on HDFS to enable query operations under underlying data. The query language we use here is called HQL (Hive Query Language) which is quite similar to SQL.

👉Note : Actually the Hadoop ecosystem is made to process large amount of data at a time and there we can see the actual performance of it. But for transactional/real-time loads, it might look slow. This mini-project is just to show how the components like Kafka, Hive and Power BI work together.

Kindly see the part-1 of Kafka blog series for a detailed explanation of other components like Flask, Kafka etc.

Pre-requisites :

  • Go through the above mentioned blogs on Kafka for better understanding about concepts and components.
  • Kafka-confluent account with a topic created. With API credentials of the Kafka cluster and schema registry. Refer the part-2 of the blog mentioned above.
  • Python installed on your system.
  • Access to Hadoop and hive ecosystem. Else go for a Cloudera or HortonWorks distribution in Virtual-box or VMWare. Enable Network adapters on your Virtualbox (Bridged adapter) to connect through Power BI.
  • Power BI desktop application.

Let me know in comments if you face any issue, feel free to let me know in the comments.

Now let’s jump into Hands on !!

Step 1 : Creating Hive table

Let’s open up the Cloudera terminal and start hive shell. If you are on any other platform, you can start a hive shell from it’s terminal or use Hue to create a Hive database and table.

Then create a database using:

create database <your DB name>
use database <your DB name>

For ex, my DB name is “hive_class_b1”.

Then create a table ex: bid2. It will have 3 columns :

  • name : name of the bidder (varchar(20))
  • price : price of the bid (int)
  • bid_ts : timestamp of bid (timestamp)
create table bid2 (name varchar(20),price int,bid_ts timestamp)

👉Note : NOT NULL and UNIQUE constraint is being supported from Hive 3.0.0 (mine is 1.1.0 :( ) — https://community.cloudera.com/t5/Support-Questions/hive-create-table-with-not-null-constraint/td-p/51172#:~:text=Check%20out%20this-,link,-https%3A//issues.apache.

👉Note : I was not able to find how to add a surrogate key (column with auto-increment) in a hive table. Will update if I find something and you guys can let me know here in comments if you find anything.

We can add partitions and bucketing too on Hive tables.

  • Partitions for better/faster performance while reading the data using where condition. We can partition the table based on the columns on which it is most likely to use WHERE condition.
  • Bucketing for better/faster performance while joining multiple Hive tables. This also results in low data transfer cost over the network when the data moves from one data node to another.

As it is a very simple table and we are not planning to run lots of adhoc queries over it, I did not add partitioning or bucketing over it. But feel free to explore these concepts (these are really interesting!!).

👉Note : If you want feel free to play with this table. Like inserting records one-by-one into the table or using a file in HDFS to load the table etc.

Refer : https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.0/bk_data-access/content/new-feature-insert-values-update-delete.html

INSERT INTO TABLE bid2 VALUES (‘Abcde’, 100, ‘2022–10–21 18:07:21’);

If you execute this command in Hive shell, you can see there will be a map-reduce job starting (by default one mapper and one reducer) which will insert the record. And this will take some time.

Step 2 : Modifying the consumer code

Install ODBC driver

First we need to install Cloudera ODBC Driver for Apache Hive.

Here select the driver version, OS and OS version. Then put some details and download the setup file.

Then run the setup file till Finish.

Install pyodbc module

But first we need to install pyodbc which is a Python module to connect with ODBC databases like MySQL, SQL server and even Hive.

Ensure you have Python installed and PATH is added to environment variable. If not just do a quick search on YouTube.

Create one Python virtual environment and install the required modules. Open the terminal and run :

#create python virtual environment
python -m venv venv-kafka-project-1
#activate virtual-env
.\venv-kafka-project-1\Scripts\activate
#then install the following libraries
pip3 install Flask
pip3 install requests
pip3 install mysql-connector-python
pip3 install confluent_kafka
pip3 install pyodbc

If you have only Python 3 installed, only pip will work. As I have both Python 2 and 3, I tend to specify pip3.

The “mysql” module is not required if you are planning to use only Hive table to store data.

Consumer code in Python

Let’s create a file hive_writer.py inside our main directory i.e. “kafka project1”. This is quite similar to db_writer.py where we connected to Kafka-confluent topic and pulled the data. Then we connected to MySQL database and inserted the records.

The only difference is instead of writing the data to MySQL database, we will create a connection to our cloudera environment and insert the data to hive table.

Code : https://github.com/sksgit7/Big-data-projects/blob/main/kafka%20project1/hive_writer.py

This is the part where we connect to Hive and insert data to the table.

pyodbc.autocommit = True
conn = pyodbc.connect('DRIVER={Cloudera ODBC Driver for Apache Hive};HOST='+host_name+';PORT='+port+';UID='+user+';PWD='+password, autocommit=True)
cursor = conn.cursor()

query = “insert into table “+database+”.bid2 (name, price, bid_ts) values ({}, {}, {});”.format(“‘“+name+”’”, price, “‘“+bid_ts+”’”)
print(query)
cursor.execute(query)cursor.close()
conn.close()

So, here first we ask pyodbc to keep autocommit ON because drivers/DBs like hive which does not support transactions will throw you error if not done. Here is a good explanation — https://stackoverflow.com/questions/29664537/python-connection-to-hive#:~:text=13-,Slight,-clarification%2C%20you%27re%20aren%27t

Then we create the connection providing parameters like driver, host, port, username and password. Then we instantiate a cursor and execute the query.

Step 3 : Producer code

There is not much of a change in producer code. It is simply getting the data from Flask web application. And in the backend it is connecting to Confluent Kafka topic on the cloud and publishing the data. Then the data get stored in different partitions of the topic.

Code : https://github.com/sksgit7/Big-data-projects/blob/main/kafka%20project1/app.py

The only change that I did was to remove the code which connects to the DB (Hive table) to get the maximum bidding price till now and show us on the web page. It was taking around 1 minute for the script to connect to Hive and fetch the value as max(price) statement is starting a Map-reduce job for it which is taking some time due to low configuration of cloudera setup.

Remove/comment these lines from the code : (At start of main function)

cnx = conn.connect(host = “localhost”, user = “root”,
passwd = “mysql”, database = “test”)
cur = cnx.cursor()
query = “select max(price) from bid;”
cur.execute(query)
result = cur.fetchone()[0]

Just add, “result = ‘NA’ ”above, for the code to function well.

And comment these lines too (At end of main function).

if int(price) > result:   result=int(price)

Step 4 : Running producer and consumer

Let’s start the producer Flask application (activate the virtual environment first). Go into the directory (Kafka project1) where app.py is there.

flask run

Now ping this localhost (127.0.0.1:5000) address in web browser.

Put a name and a price and click on “Bid”. This will send the values to backend which will publish this data into the Kafka topic.

We can also check for the same record on Confluent Kafka dashboard by searching on the mentioned partition and offset. For better guide through, refer this blog of mine — https://subham-sahoo.medium.com/live-data-streaming-project-using-kafka-part-2-2df7ed529413.

And in parallel let’s run hive_writer.py on a different terminal (activate the virtual environment and go to the project directory).

py hive_writer.py

We can use python hive_writer.py too.

In few seconds we will see that our consumer has fetched the record from Kafka topic and it will take some time to insert into Hive table.

Then I inserted two more records (Aa, 5500) and (Sam, 6000) from web application.

And on the consumer terminal we can see the records are inserted into Hive table now.

Now let’s check the Hive table.

#Enable header while viewing dataset hive.cli.print.header = true;#select the database (if not done till now)use <your DB name>select * from bid2;

Actually I have already inserted few records into the table. But at the bottom we can see our recently inserted records.

👉Note : One of the issue is that it is taking 1–2 minutes to connect and insert a records into the Hive table. Also while inserting a map-reduce job is started at background which takes some time. This shows that Hadoop ecosystem is built for large data processing rather than transactional and real-time processes.

👉Note : As we have done before, we can use multiple consumers in parallel, that will improve the speed of the process in-case we have a lot of data coming from our web-page simultaneously. We can keep the group id same for these consumers and they will divide the messages/records among themselves for further operations (e.g. inserting into hive table).

So, in this way we can publish and fetch messages into Kafka topic. Then insert these records into hive table.

📊Bonus : Power BI dashboard

Previously we have created a dashboard on MySQL table. Now let’s go ahead and try that out with Hive table.

You need to have “Power BI desktop application” installed to follow along. It is free of cost, just download it from the official website.

Creating ODBC data source for Hive

On your hadoop terminal (for me it is cloudera terminal), use “ifconfig” command to get the IP address.

At the highlighted area, you will find an IP address.

If you do not find any then you might have to enable Network adapters for your system/environment. Also you might find multiple IP addresses for eth0, eth1 etc. if multiple adapters have been enabled.

👉Note : If you are using Cloudera or HDP on virtual box, you can enable Bridged adapters under Settings > Network.

I hope you have installed the ODBC driver for Hive as mentioned under Step-2. Based on the OS version you have used, open the ODBC Datasource Administrator tool on your system. For example, if you have installed 64-bit driver on windows, search for 64-bit version of ODBC Admin tool.

Under User DSN click Add.

Choose Cloudera ODBC Driver for Apache Hive and click on Finish.

Then we have to fill the name of the data source, Host (IP), port, authentication details etc. Then click on Test. On success just click on Ok to add the data source.

Now we can see a data source added under USER DSN.

Power BI connection

Now let’s open Power BI desktop application and connect to the Hive database.

Click on Get data > More > Others > ODBC and Connect.

Choose the data source and Ok.

Then if it asks for username and password, then provide the same. For Cloudera distribution, by default it is “cloudera” for both. Then this window will open where we can see all our tables.

Let’s choose our table bid2 and click on Transform. As we do not have any transformations to apply over it, we can just the data type of the columns and click on Close and Apply.

Then on the Visualization window we can see our table has been loaded and there will be the list of columns on the right side.

Then I used some of my Power BI skills to create this beautiful dashboard 😜.

Maybe not so beautiful, but this is something I was able to pull off with small dataset and shot time span. You guys can play around with more data and different visuals.

Done!!

If you liked this project, kindly do clap. Feel free to share it with others.

👉Follow me on medium for more such interesting contents and projects.

Check out my other projects here : https://medium.com/@subham-sahoo/

Connect with me at LinkedIn. ✨

Thank you!!

References

https://stackoverflow.com/questions/29664537/python-connection-to-hive

https://stackoverflow.com/questions/54363369/how-to-connect-in-python-3-7-to-a-hadoop-hive-database-that-requires-authentica

https://learn.microsoft.com/en-us/sql/connect/python/pyodbc/step-3-proof-of-concept-connecting-to-sql-using-pyodbc?view=sql-server-ver16

https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.0/bk_data-access/content/new-feature-insert-values-update-delete.html

https://community.cloudera.com/t5/Support-Questions/hive-create-table-with-not-null-constraint/td-p/51172

--

--