AWS Change Data Capture project part 3

Subham Kumar Sahoo
6 min readJun 14, 2022

We will be implementing a Change Data Capture project using AWS RDS, DMS, S3, Lambda, Glue job with IAM.

Let’s continue this project.

Previous part can be found here : https://subham-sahoo.medium.com/aws-change-data-capture-project-part-2-4c1fde84f85c

Step 5 : Connecting and loading data to DB

We will be connecting to our RDS DB instance using MySQL workbench (on our local system). Then we will create schema, table and load the data too.

Add inbound rule to default security group

We need to allow access to our DB instance from MySQL workbench. For that we need to add inbound access rule to the security group attached to our RDS instance (i.e. default security group).

Click on the database instance and click on the attached SG.

Go to inbound rules tab and Edit.

Click http://checkip.amazonaws.com/ to find your system’s IP address.

Then click on Add and create new SG with these configs. Under Source choose custom and put your IP with “/32” at the end. “/32” means the network has a single IPv4 address. For more info you can refer https://www.quora.com/What-does-32-mean-after-an-IP-address.

Connect from MySQL workbench

Before this kindly ensure that you have installed MySQL workbench on your system. There are many resources you can refer for this setup.

Open MySQL workbench and click on the “+” button to add connection. Then provide a connection name, method : Standard (TCP/IP).

Click on your database instance on AWS and get the endpoint. Put it as hostname.

Port : 3306. Then put the username and password (Store in Valut > put password > Save).

Test the connection and click OK.

Create schema, table and load dummy data

Open a query editor page and run following code. This will create a schema “sks_schema” and table “Persons”. Our table will have 3 columns PersonID, FullName and City with Id being Primary key.

CREATE SCHEMA sks_schema;CREATE TABLE sks_schema.Persons (
PersonID int,
FullName varchar(255),
City varchar(255),
PRIMARY KEY (PersonID)
);

The use insert statements like this to insert few records into the table.

INSERT INTO sks_schema.Persons VALUES (160,’Ram Chopra’,’Denver’);
INSERT INTO sks_schema.Persons VALUES (161,’Sam Mallik’,’New York’);

Check number of records using -

Select count(*) from sks_schema.Persons;

So, now our initial load into the table is done.

Step 6 : DMS task

An AWS Database Migration Service (AWS DMS) task is where all the work happens. It connects to the data source and targets using DMS endpoints and migrates the data.

On DMS cnsole, click on migration tasks (left navigation pane) and create task.

Provide a name to task, replication instance and endpoints.

Migration type : migrate existing data and replicate on-going changes.

Here you can see the message that MySQL binary log need to be enabled (which we did before using parameter group).

Under Task setting

As we have one table, we specified 1 here.

Under table mappings

Provide schema, table name.

Create task.

As soon as we create, the task starts. And we can see a file like LOAD0000001.csv under “full-load..” bucket sks_schema/Persons folder. This file will contain the existing data in our table.

Then you can manually perform insert, update and delete operations from MySQL workbench. Wait for few 3–4 mins after initial file loaded and then make changes.

update sks_schema.Persons set FullName = ‘TestXYZ’ where PersonId = 55;
INSERT INTO sks_schema.Persons VALUES (155,’Alan Truman’,’Denver’);
DELETE FROM sks_schema.Persons where PersonId = 56;

Then you can see another file in that folder like “20220613–19100..csv” which will contain data like:

U,55,TestXYX,Denver
I,155,Alan Truman,Denver
D,56,Alex Groot,New York

I- Insert, U- Update, D-Delete.

Note : You can stop your database instance and DMS task for now. Will start again after lambda function and glue job are ready. You can delete all folders and files in full_load bucket.

Step 7 : Lambda function

This lambda function will we triggered as soon as new file lands in our full_load bucket (sks_schema/Persons path). And this will start our glue job for data processing there.

Create IAM role for lambda

This role will grant lambda function the access to S3, Glue and CloudWatch (for monitoring function logs and we can check logs in case of failures there).

On IAM dashboard > Roles > create role.

Search and select these policies.

Give a name to the role and create.

Create function

Go to the lambda console on AWS and create function.

Choose the created role and create the function.

Add trigger to lambda function

We want it to trigger as soon as a file lands in our full-load S3 bucket.

Click on the function and click “add trigger”.

Configure the trigger as following:

Note — remove extra space in Prefix — It should be sks_schema/Persons/.

Getting S3 bucket and file name in lambda (using events)

So, when a new file lands in S3 path, it triggers the function. Through lambda “events” we can get the bucket name and file name.

Just put this code in your function and click on deploy (to save the code).

import json
import boto3
def lambda_handler(event, context):
print(event)

Note : You can increase the timeout and memory of lambda function if you have a lot of files and folders in the bucket and it is failing due to timeout or memory issues there.

Then drop a random file in the full-load S3 bucket, under sks_schema/Persons path. This will trigger the lambda function.

Go to the “Monitor” tab and click on “View logs in CloudWatch”. Else you can go to CloudWatch dashboard > Logs > Log groups and find the suitable log group for this function.

Then click on the latest log.

So, if you are aware about Python lists and dictionaries then you can traverse this event dict to get bucket and file name i.e.

bucketName = event[“Records”][0][“s3”][“bucket”][“name”]
fileName = event[“Records”][0][“s3”][“object”][“key”]

Further steps in the next part..

Follow me for more such interesting contents and projects.

Connect with me at LinkedIn.

References

AWS documentations.

PySpark and AWS: Master Big Data with PySpark and AWS by Muhammad Ahmad.

--

--