Query AWS S3 data from Lambda function
Run SQL queries on AWS S3 objects from AWS Lambda function and get the results as csv files.
Problem statement
Let’s say we have some files in a AWS S3 bucket.
- These files can be of json, orc, avro, parquet format.
- Also these are getting generated by some process which runs on a schedule (daily, weekly etc.)
- And it is possible that the process might generate multiple files for a single run. For example, it can be a Glue Spark job that is generating data in multi-parts (due to multiple reducers being used).
Our requirement is that we have another process (on cloud or on local) that requires this data at regular interval (daily, weekly, monthly) etc. which can be different from the scheduled interval of our parent process (that is generating the files.
- Our end process just require a subset of the data based on date or any other condition.
- It might require some transformations to be pre-applied on the columns or attributes.
Solution overview
We will have a AWS Glue crawler run on our bucket which will scan the objects (files) and create an adequate schema (attribute information like data types) over it. It will create a table in Glue data catalog which will consist of the schema. But the table will not hold the actual data, it will be like a View over the actual data which is stored in S3 bucket.
Note : We need not run the crawler every time when we get a new file in source bucket. We only need to run it when there is a change in schema of the data (i.e. columns added, column name changed etc.).
Then we will create a Lambda function that will start an Athena query on the S3 data and generate a csv file in another bucket. It is a good practice to have different buckets for source and results file.
Then these files can be used by other process on AWS or can be FTP to local servers.
For this demo we will use state/UT wise covid cases data (few records).
Git repository : https://github.com/sksgit7/AWS_projects/tree/main/query_S3_from_Lambda
Step 1 : Create S3 buckets
Let’s create three S3 buckets.
- crawler-bucket-sks : Our source bucket. We will use AWS glue crawler to crawl this bucket objects and create a Glue data catalog.
- output-bucket-sks : Destination bucket. We will run query on Source data and the results will be stored in this bucket in csv format.
- athena-results-sks : We need to provide a output bucket path for athena queries. When we run any query in athena, let it be on athena query editor or from lambda, it stores the results in form of a csv file and a metadata file (for each query). For this Athena needs a bucket path.
On AWS S3 console, click on create bucket.
Provide the name of the bucket and the region. We can keep the other settings as default.
- Enable ACLs if you want to give the bucket access to other accounts.
- Enable versioning on bucket if the data is critical and you want to protect from accidental deletion. Also we can revert back to a particular version of the object.
- Enable encryption to encrypt the objects (files).
- Un-check Block all public access if you want to make bucket public i.e. you can access bucket objects using their object URL.
Similarly create the other 2 buckets.
Note : S3 is a global service. But still we need to specify a region while creating a bucket. Because the bucket objects are stored across multiple availability zones in that particular region for high availability. Also we cannot change the bucket region after creating it.
Add data file in source bucket
Create a file “covid1.json” on local system (or get it from Github repository) and upload the file to “crawler-bucket-sks” bucket.
{“sl_no”: “1”,”state_ut_wise”: “Andaman & Nicobar Islands”,”active_cases”: 165,”total_cases”: 3631,”cumulative_cured_discharged_cases”: 3414,”total_deaths”: 52}
{“sl_no”: “2”,”state_ut_wise”: “Andhra Pradesh”,”active_cases”: 84423,”total_cases”: 609558,”cumulative_cured_discharged_cases”: 519891,”total_deaths”: 5244}
{“sl_no”: “3”,”state_ut_wise”: “Arunachal Pradesh”,”active_cases”: 1886,”total_cases”: 7005,”cumulative_cured_discharged_cases”: 5106,”total_deaths”: 13}
{“sl_no”: “4”,”state_ut_wise”: “Assam”,”active_cases”: 28631,”total_cases”: 152858,”cumulative_cured_discharged_cases”: 123687,”total_deaths”: 540}
{“sl_no”: “5”,”state_ut_wise”: “Bihar”,”active_cases”: 12609,”total_cases”: 165218,”cumulative_cured_discharged_cases”: 151750,”total_deaths”: 859}
Note : The format of the json file should be similar to the above. Few things to note that each json record should be single line, no comma after each line etc. else Athena query will fail.
Step 2 : Create Glue crawler and query Athena
Now we will create a Glue Crawler to crawl the “crawler-bucket-sks” objects and create the schema and a view on the data.
Go to the AWS Glue dashboard an click on Crawlers. Then create.
Provide a name and next.
Click on Add data source. Select the S3 bucket path and choose “Crawl all sub-folders”.
We can even specify “exclude patterns” to exclude certain files or folders. But for now we will not use it.
Then add the data source. Click Next.
Create a new IAM role for the crawler. This will contain the basic permissions required for the crawler like access to S3 bucket.
Then click on Add database. This will create a Glue database where Glue will store the Views.
Select Frequency as On demand and click on Next.
Review configuration and create crawler.
Running the crawler
Select the crawler and click on Run. Wait for sometime for the crawler to finish. It should show table created as 1 and it should come back to “Ready” status.
Click on Tables and here we can see a new table created.
Click on the table and we can see the table schema.
Step 3: Query data using Athena query editor
Open Query editor in AWS Athena console. Click on the Settings and we need to configure the output bucket location (athena-results-sks) for Athena queries.
Click on Manage and provide the bucket path.
Click Save. Come back to Editor.
Select the Database and table. Write a simple query as below and click on run.
We can see the data in a tabular form.
Note : The query we run here, the results get stored as csv and metadata files in athena-results-sks S3 bucket (under Unsaved folder).
Then add the 2nd file “covid2.json” in the source bucket and run the following query to check the count of total records.
As each file contains 5 records each. It is showing 10 records in total. This means the data is getting properly read.
Step 4 : Create IAM role for Lambda function
This role will provide access to
- CloudWatch (LambdaBasic role) : To put the execution logs.
- Athena : To run the queries on S3 data.
- S3 : To be able to access S3 data through Athena.
On IAM dashboard, click on Roles on left nav-bar. Then clck on Create role.
Choose type : AWS service and Use case : Lambda.
Then click on Next and add necessary permissions (policies). Search and select these three.
Generally for demo purpose I tend to use FullAccess policies. But the best practice is to provide only required accesses. Therefore either choose limited access policies or tweak the policy statements (JSON format). For example, our Lambda function only requires access to the above 3 buckets. So, in the policy statement we can these 3 buckets only (under resources).
Then provide a suitable name to the role and create.
Step 5 : Create Lambda function and add code
Now we will create a Lambda function to run Athena queries.
Provide a suitable name and choose runtime as Python 3. Choose the existing Execution role that we created before. We can keep the other configurations default and create the Lambda function.
Get the code from the Git repository mentioned at the top and add it. Click on Deploy to save the code.
The code explanation have been provided as comments inside the code.
Note : The /tmp directory is present in Lambda functions ephemeral storage. By default the size is 512 MB. If our file size is more then we can increase it. Also we can increase the Memory (RAM) of the Lambda function to 10 GB.
Step 6 : Running the Lambda function
Now let’s run the Lambda function by clicking on Test. If this is your first time running that Lambda function it will ask you to create a test event. So, just give any name to the event and click Save. Then click on Test again.
Now if we go to athena-results-bucket we can see the results of the query that we ran through Lambda function as .csv and .metadata file.
If everything works fine (which should be the case ;)) we will see a file “results.csv” in our target bucket i.e. “output-bucket-sks”.
Download to view the file.
In this way we can run Athena queries on S3 data (in various formats like json, orc, avro, parquet etc.) from our Lambda function and get the results file. Then other processes can use these files or we can FTP these to local servers if required.
Improvements
- We can add a S3 trigger on our lambda function. So, when a new file lands in the source bucket it triggers the lambda function. Which in return runs the required query on the full data to fetch the results and generate a new results file. We can also specify in the trigger if we want to ignore a certain type of file.
- We can add some transformations on specific columns of the data, if required. This can be done within the Lambda function code.
- We can run the Glue crawler based on a schedule. We can use AWS EventBridge schedule task for this job.
Alternatives
As we are providing an output result location in start_query_execution() function, the result of the query is automatically getting stored into that S3 path. So, we can remove the step of reading the data, creating results.csv file in /tmp folder of Lambda and storing it in a different bucket. But this can have some challenges like:
- We cannot perform any transformation on the attributes if required. Because we are not reading the data.
- Athena creates 2 files for each query run which are csv and metadata file. So, we might need to remove the metadata files as they will interfere with the schema of actual data.
If you liked this project, kindly do clap. Feel free to share it with others.
Follow me on medium for more such interesting contents and projects.
Connect with me at LinkedIn.
Thank you!!