In Part 1, we discussed a streaming ingestion solution using EventBridge, Lambda, MSK and Redshift Serverless. Athena provides the MSK connector to enable SQL queries on Apache Kafka topics directly and it can also facilitate the extraction of insights without setting up an additional pipeline to store data into S3. In this post, we discuss how to update the streaming ingestion solution so that data in the Kafka topic can be queried by Athena instead of Redshift.
Part 2 – MSK and Athena (this post)
Architecture
As Part 1, fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Once messages are sent to a Kafka topic, they can be consumed by the Athena MSK Connector, which is a Lambda function that can be installed from the AWS Serverless Application Repository. A new Athena data source needs to be created in order to deploy the connector and the schema of the topic should be registered with AWS Glue Schema Registry. The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.
Infrastructure
The ingestion solution shares a large portion of infrastructure and only new resources are covered in this post. The source can be found in the GitHub repository of this post.
Glue Schema
The order data is JSON format and it has 4 attributes – order_id, ordered_at, user_id and items. Although the items attribute keeps an array of objects that includes product_id and quantity, it is specified as VARCHAR because the MSK connector doesn’t support complex types.
{ |
The registry and schema can be created as shown below. Note the description should include the string {AthenaFederationMSK} as the marker string is required for AWS Glue Registries that you use with the Amazon Athena MSK connector.
# integration-athena/infra/athena.tf resource “aws_glue_registry” “msk_registry” { |
Athena MSK Connector
In Terraform, the MSK Connector Lambda function can be created by deploying the associated CloudFormation stack from the AWS Serverless Application Repository. The stack parameters are passed into environment variables of the function and they are mostly used to establish connection to Kafka topics.
# integration-athena/infra/athena.tf resource “aws_serverlessapplicationrepository_cloudformation_stack” “athena_msk_connector” { |
Lambda Execution Role
The AWS document doesn’t include the specific IAM permissions that are necessary for the connector function and they are updated by making trials and errors. Therefore some of them are too generous and it should be refined later.
First it needs permission to access a MSK cluster and topics and they are copied from Part 1.
Next access to the Glue registry and schema is required. I consider the required permission would have been more specific if a specific registry or schema could be specified to the connector Lambda function. Rather it searches applicable registries using a string marker and that requires an additional set of permissions.
Then permission to the spill S3 bucket is added. I initially included a typical read/write permission on a specific bucket and objects but the Lambda function complained by throwing 403 authorized errors. Therefore I escalated the level of permissions, which is by no means acceptable in a strict environment. Further investigation is necessary for it.
Finally permission to get Athena query executions is added.
# integration-athena/infra/athena.tf resource “aws_iam_role” “athena_connector_role” { |
Security Group
The security group and rules are shown below. Although the outbound rule is set to allow all protocol and port ranges, only port 443 and 9098 with the TCP protocol would be sufficient. The former is to access the Glue schema registry while the latter is for a MSK cluster with IAM authentication.
# integration-athena/infra/athena.tf resource “aws_security_group” “athena_connector” { |
Athena Data Source
Unfortunately connecting to MSK from Athena is yet to be supported by CloudFormation or Terraform and it is performed on AWS console as shown below. First we begin by clicking on the Create data source button.
Then we can search the Amazon MSK data source and proceed by clicking on the Next button.
We can update data source details followed by selecting the connector Lambda function ARN in connection details.
Once the data source connection is established, we are able to see the customer database we created earlier – the Glue registry name becomes the database name.
Also we can check the table details from the Athena editor as shown below.
Kafka Producer
As in Part 1, the resources related to the Kafka producer Lambda function are managed in a separate Terraform stack. This is because it is easier to build the relevant resources iteratively. Note the SAM CLI builds the whole Terraform stack even for a small change of code and it wouldn’t be convenient if the entire resources are managed in the same stack. The terraform stack of the producer is the same as Part 1 and it won’t be covered here. Only the producer Lambda function source is covered here as it is modified in order to comply with the MSK connector.
Producer Source
The Kafka producer is created to send messages to a topic named orders where fake order data is generated using the Faker package. The Order class generates one or more fake order records by the create method and an order record includes order id, order timestamp, user id and order items. Note order items are converted into string. It is because the MSK connector fails to parse them correctly. Actually the AWS document indicates the MSK connector interprets complex types as strings and I thought it would be converted into strings internally. However it turned out the list items (or array of objects) cannot be queried by Athena. Therefore it is converted into string in the first place. The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. A Kafka message is made up of an order id as the key and an order record as the value. Both the key and value are serialised as JSON. Note that the stable version of the kafka-python package does not support the IAM authentication method. Therefore we need to install the package from a forked repository as discussed in this GitHub issue.
# integration-athena/kafka_producer/src/app.py import os |
A sample order record is shown below.
{ |
Deployment
In this section, we skip shared steps except for local development with SAM and analytics query building. See Part 1 for other steps.
Local Testing with SAM
To simplify development, the Eventbridge permission is disabled by setting to_enable_trigger to false. Also it is shortened to loop before it gets stopped by reducing msx_run_sec to 10.
# integration-athena/kafka_producer/variables.tf locals { … |
The Lambda function can be built with the SAM build command while specifying the hook name as terraform and enabling beta features. Once completed, it stores the build artifacts and template in the .aws-sam folder.
$ sam build –hook-name terraform –beta-features |
We can invoke the Lambda function locally using the SAM local invoke command. The Lambda function is invoked in a Docker container and the invocation logs are printed in the terminal as shown below.
$ sam local invoke –hook-name terraform module.kafka_producer_lambda.aws_lambda_function.this[0] –beta-features # sent 100 messages |
We can also check the messages using kafka-ui.
Order Items Query
Below shows the query result of the orders table. The items column is a JSON array but it is stored as string. In order to build analytics queries, we need to flatten the array elements into rows and it is discussed below.
We can flatten the order items using the UNNEST function and CROSS JOIN. We first need to convert it into an array type and it is implemented by parsing the column into JSON followed by type-casting it into an array in a CTE.
WITH parsed AS ( ordered_at, user_id, |
We can see the flattened order items as shown below.
The remaining sections cover deploying the kafka producer Lambda, producing messages and executing an analytics query. They are skipped in this post as they are exactly and/or almost the same. See Part 1 if you would like to check it.
Summary
Streaming ingestion to Redshift and Athena becomes much simpler thanks to new features. In this series of posts, we discussed those features by building a solution using EventBridge, Lambda, MSK, Redshift and Athena. We also covered AWS SAM integrated with Terraform for developing a Lambda function locally.