AWS Change Data Capture project part 1
We will be implementing a Change Data Capture project using AWS RDS, DMS, S3, Lambda, Glue job with IAM.
What is a “change data capture/replication ongoing” is?
For example, if you have a data storage like a database and you want to replicate the data continuously with on-going changes to another storage like a data lake (data stored as files). In this case we need a CDC pipeline which will capture and replicate changes in one storage to another.
Architecture
RDS
We will have a RDS MySQL DB instance which will have our primary table (like operational table). This table will have some initial data and will undergo changes like inserts, updates and deletes.
For this project, we will be doing manual changes through MySQL workbench on local system but in real world these changes can be done through other means too.
DMS
AWS Database Migration Service helps in migrating data from one storage location to another storage location. The source and destination can be on AWS or even on-premise.
So, we will connect to RDS instance using DMS, read data and any change and then load the data and updates as files to a S3 bucket (full_load bucket).
S3
AWS Simple Storage Service is an object storage of AWS. We can create S3 buckets and store our files. It can be considered as a data lake too.
We will be loading our table data in a S3 bucket (full_load bucket) under a folder named as the table name. So, initial load will look like:
0,Herman Zimmerman,Oklahoma City
1,Lisa Ray,Columbus
..
We can add headers too (that will be covered in implementation part).
Then when there is any change in the table data, DMS will capture the change and put another file containing the changes and change status.
U,60,TestXYZ,Philadelphia
I,157,Alex Hunter,Denver
D,61,Lois Parker,Fort Worth
U: Update, I: Insert and D: Delete here.
We will also create another bucket “cdc-pyspark-result” which will contain our final files from our glue job. For every new file write in this bucket, glue job will create a folder in this bucket with name like “yyyy-mm-dd-hh-mm-ss” (current date-time) and put a file in it. We have named the folders as such so that it will be easy to get the latest one based on folder name.
Lambda
AWS Lambda is a serverless, event-driven compute service that lets you run code for virtually any type of application or backend service without provisioning or managing servers.
We will create a lambda function that will get triggered when a new file lands in the “full_load” bucket “Persons” folder. It will retrieve the bucket and file info. As well as it will also get the latest folder name in the “cdc-pyspark-result” bucket. Then it will start our glue job with the above info as arguments there.
Glue job
AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data. In other words we can use this to process our data using spark.
So, our lambda function will start the glue job with arguments like:
target path key : path of the S3 file that landed in full_load bucket triggered the lambda.
target path bucket: bucket name (full_load bucket)
latest folder : name of the latest folder in cdc-pyspark-result bucket.
For the initial load, it will just take the file from “full_load” bucket and put it in “cdc-pyspark-result” bucket. Then for every update file we get in “full_load” bucket it will read it and also read the latest file from “cdc-result” bucket. Then add the changes to the latest full data and write it as a new file (in a new folder) in “cdc-result-bucket”.
IAM
AWS Identity and Access Management (IAM) provides fine-grained access control across all of AWS. With IAM, you can specify who can access which services and resources, and under which conditions.
We will use IAM roles to grant access of one AWS service to another AWS service.
Process
- We add data to RDS MySQL instance table(initial load).
- Start the DMS task. It will read the data and create a file like “LOAD00*.csv” in “full_load” bucket under <schema_name>/<table_name> path.
0,Herman Zimmerman,Oklahoma City
1,Lisa Ray,Columbus
2,Test,Jacksonville
3,Steve Goodwin,Charlotte
3. Then this file will trigger lambda function, which will start the glue job.
4. Glue job will get file and bucket info from lambda and write the file to “cdc-pyspark-result” bucket inside a folder named as current date-time. This file is currently our latest data file.
5. Then we make some changes to the DB table (after few minutes atleast).
6. DMS will capture the change and create another file like “20220613–19*.csv” in “full_load” bucket marking the modified records as I, U or D.
U,1,TestXYZ,Columbus
I,4,Alex Hunter,Denver
D,3,Steve Goodwin,Charlotte
7. Then this file will also trigger the lambda function which will start the glue job. Lambda function also retrieves the latest folder in “cdc-result” bucket which contains the latest data file and sends to glue job.
8. Glue job reads this file as a data-frame. It also reads the latest file in “cdc-result” bucket as a data-frame. Then it make changes to the 2nd data-frame as per the update list. Now the final data frame will have:
0,Herman Zimmerman,Oklahoma City
1,TestXYZ,Columbus
2,Test,Jacksonville
I,4,Alex Hunter,Denver
Then it will write the latest data (with changes) to “cdc-result” bucket in a new folder:
Note: We can also try to overwrite the same final file but there is a chance that glue job will give an error like “file not found” for the latest file that we will be trying to read within glue job. This happens because when we are reading the latest file in “cdc-result” bucket, do transformations and overwrite the same file (let’ say ABC.csv), sometime the spark task may fail and it again tries to re-run. So, if we are overwriting then it will create file with new name (XYZ.csv). But if spark job fails during any transformation or write, a new job is started which will not find the existing latest file (ABC.csv) and throws an error.
Use cases
- The final file can be used for many use cases like reporting and analytics. Also these can be FTP-ed after every change or in regular intervals for on-premise use-cases too.
- At step six of the above process we get get a file with update information. So, this can be used as a delta-detection file of operational table and AWS or on-premise jobs can use this file to make required changes in data warehouse tables.
- Insights can be provided on data changes per an interval through mail, dashboards etc.
So, I hope you got the basics. Now let’s move to implementation part.
Part 2: https://subham-sahoo.medium.com/aws-change-data-capture-project-part-2-4c1fde84f85c
Connect with me at LinkedIn.
Follow me for more such interesting contents and projects.
References
AWS documentations.
PySpark and AWS: Master Big Data with PySpark and AWS by Muhammad Ahmad.