AWS Change Data Capture project part 4

Subham Kumar Sahoo
7 min readJun 14, 2022

We will be implementing a Change Data Capture project using AWS RDS, DMS, S3, Lambda, Glue job with IAM.

Let’s continue this project.

Previous part can be found here: https://subham-sahoo.medium.com/aws-change-data-capture-project-part-3-ccf745c2ca00

Step 8 : Glue job

Create IAM role for Glue job

Access for the S3 and CloudWatch.

IAM console > Roles > Create role

Add these 2 policies, add a name to the role (like “Glue-S3-CDC-pyspark”) and create.

Create Glue job

To perform transformations on our data that it will read from S3 files in “full_load..” S3 bucket. We will use PySpark data-frame to transform the data.

Go to the Glue console > Jobs (not legacy). It will open Glue studio.

Then create job as follows.

Add name and IAM role.

Choose Language : Python, number of workers : keep as 2 only as we are not performing complex tasks on large datasets.

Just keep “continuous logging” checked.

Max. retries : 1 and Uncheck — Use Glue data catalog as the Hive metastore.

Click on the “Save” button at top.

Go to the “Code editor” tab and put this code:

from awsglue.utils import getResolvedOptions
import sys
from pyspark.sql.functions import when
from pyspark.sql import SparkSession
from datetime import datetime
#get arguments from lambda function
args = getResolvedOptions(sys.argv,[‘s3_target_path_key’,’s3_target_path_bucket’,’s3_latest_folder’])
bucket = args[‘s3_target_path_bucket’]
fileName = args[‘s3_target_path_key’]
latest_folder = args[‘s3_latest_folder’]
#get current time (UTC)
now = datetime.now()
dt_string = now.strftime(“%Y-%m-%d-%H-%M-%S”)
print(bucket, fileName)

Save.

target path key : path of the S3 file that landed in full_load bucket triggered the lambda.

target path bucket: bucket name (full_load bucket)

latest folder : name of the latest folder in cdc-pyspark-result bucket. So, our glue job will

  • read the update files (like 20200613–1900..csv containing info about records modified) from full_load bucket as a data-frame and
  • read existing data from latest folder (folders are named as yyyy-mm-dd-hh-mm-ss) in “cdc-pyspark-result” as another data-frame
  • merge the changes like insert, update and delete rows accordingly in the existing data data-frame and store it in “cdc-pyspark-result” bucket in a different folder named as current date-time.

We will be adding further code later. So, for reading the latest full data we need the name of the latest folder.

Add code to lambda function

Final code for lambda function:

import json
import boto3
def lambda_handler(event, context):

#get bucket name and file name using CloudWatch logs
bucketName = event["Records"][0]["s3"]["bucket"]["name"]
fileName = event["Records"][0]["s3"]["object"]["key"]

print(bucketName, fileName)

#boto3 glue client to connect to glue
glue = boto3.client('glue')

############# Get latest folder name in cdc-pyspark-result bucket #####
bucket = 'cdc-pyspark-result'
#Make sure you provide / in the end

client = boto3.client('s3')
result = client.list_objects(Bucket=bucket, Prefix='', Delimiter='/')

#if there is no folder in bucket (no DMS run till now) then we need to specify some string as latest folder parameter
#else glue job will throw error as "require argument - s3_latest_folder"
latest_folder='a'
if result.get('CommonPrefixes')!=None:
folder_list=[x['Prefix'] for x in result.get('CommonPrefixes')]
latest_folder=sorted(folder_list)[-1]
print('latest_folder',latest_folder)
###################

#start glue job with below mentioned parameters
response = glue.start_job_run(
#glue job name
JobName = 'glue-s3-cdc-pyspark',
Arguments = {
'--s3_target_path_key': fileName,
'--s3_target_path_bucket': bucketName,
'--s3_latest_folder': latest_folder
}
)

#just to show status as all OK (200), can be removed.
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}

Then put any random file under full_load bucket sks_schema/Persons folder.

Then go to CloudWatch logs and check for jobs/output log group under Logs.

Choose the latest log.

As per the code in glue job we will be seeing file and bucket names in log.

You can also check the log for lambda function as we did before. So, now are lambda function and glue job are working fine.

Adding transformation code to glue job

After print(filename, bucketname) add this code:

#create spark session
spark = SparkSession.builder.appName("CDC").getOrCreate()
inputFilePath = f"s3://{bucket}/{fileName}" #full_load bucket file path
resultFilePath = f"s3://cdc-pyspark-result/{latest_bucket}" #cdc-pyspark-result bucket latest folder path
finalFilePath = f"s3://cdc-pyspark-result/{dt_string}" #final file path in cdc-result bucket (folder named as date-time)
#if there is existing data in RDS table and then we run DMS task,
#as it is the first load then the file created will be like "LOAD000001.csv"
#in that case write the file in result bucket
if "LOAD" in fileName:
fldf = spark.read.csv(inputFilePath) #read "LOAD00*.csv file from full_load bucket)
fldf = fldf.withColumnRenamed("_c0","id").withColumnRenamed("_c1","FullName").withColumnRenamed("_c2","City") #rename headers
fldf.write.mode("overwrite").csv(finalFilePath) #write file
#if file contains update info (not LOAD file) then read existing final file
#merge changes and write it to cdc-result bucket in diff. folder
else:
udf = spark.read.csv(inputFilePath) #read file from full_load bucket
udf = udf.withColumnRenamed("_c0","action").withColumnRenamed("_c1","id").withColumnRenamed("_c2","FullName").withColumnRenamed("_c3","City")
ffdf = spark.read.csv(resultFilePath) #read latest existing data from cdc-result bucket
ffdf = ffdf.withColumnRenamed("_c0","id").withColumnRenamed("_c1","FullName").withColumnRenamed("_c2","City")

#read rows from update file with (I, U or D) mentioned and perform insert, update or delete
for row in udf.collect():
if row["action"] == 'U':
ffdf = ffdf.withColumn("FullName", when(ffdf["id"] == row["id"], row["FullName"]).otherwise(ffdf["FullName"]))
ffdf = ffdf.withColumn("City", when(ffdf["id"] == row["id"], row["City"]).otherwise(ffdf["City"]))
if row["action"] == 'I':
insertedRow = [list(row)[1:]]
columns = ['id', 'FullName', 'City']
newdf = spark.createDataFrame(insertedRow, columns)
ffdf = ffdf.union(newdf)

if row["action"] == 'D':
ffdf = ffdf.filter(ffdf.id != row["id"])

#coalesce() to reduce number of Spark output partitions before writing to Amazon S3. This reduces the number of output files.
#coalesce(1) will create 1 output file
ffdf.coalesce(1).write.mode("overwrite").csv(finalFilePath)

Note: You can put .options(header=True) before .mode() to write headers too.

Save.

Step 9 : Run the loads

Full load (initial load)

First start your database instance.

The after it shows “Available” (around 4–5 minutes), start DMS task. Ensure to empty both of your buckets.

Choose Restart, so that it also loads existing data (full initial load).

After few minutes DMS will load the initial LOAD*.csv file to Persons folder in full_load bucket there.

Then this file will trigger lambda function which will trigger glue job. You can check for lambda function and glue job runs in CloudWatch logs. Then click on the Glue job and go to monitor tab to see the job status. It should take 1–2 minutes for small dataset and this much transformations. After the job is successful, go to the “cdc-pyspark-result” bucket and you can see a folder like this:

It will have a file like this:

Both LOAD*.csv file and this part-000* file will have same data as both are result of initial load.

Updated load

We will perform some modifications to the Persons table manually through MySQL workbench. Run these queries after 4–5 minutes in Query editor of MySQL workbench:

update sks_schema.Persons set FullName = 'TestXYZ' where PersonId = 60;
INSERT INTO sks_schema.Persons VALUES (157,'Alex Hunter','Denver');
DELETE FROM sks_schema.Persons where PersonId = 61;

We will see another file like “2022….csv” in Persons folder of full_load bucket. And after the lambda function and glue job are successful you can see another folder in cdc-pyspark-result bucket with file like “part..csv”.

This “2022*.csv” file will have:

U,60,TestXYZ,Philadelphia
I,157,Alex Hunter,Denver
D,61,Lois Parker,Fort Worth

In cdc-pyspark-result bucket:

New folder: “..19–10–34/” will have a file like “part-00000-*.csv” which will have existing data with changes.

Row with Id 61 is deleted.

Similarly we can perform insert, update or delete operation on our table data and every-time it will be captured by DMS (as a file in S3) and using Glue job we will create new files with latest changes.

Congrats!! We are done with our CDC project.

Note: After you are done with testing and exploring, stop and/or remove the resources you created to avoid extra charges.

Follow me for more such interesting contents and projects.

Connect with me at LinkedIn.

References

AWS documentations.

PySpark and AWS: Master Big Data with PySpark and AWS by Muhammad Ahmad.

--

--