As Kafka producer and consumer apps are decoupled, they operate on Kafka topics rather than communicating with each other directly. As described in the Confluent document, Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. Producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka, Amazon Managed Streaming for Apache Kafka, Amazon Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda. In this series, we discuss how to integrate Python Kafka producer and consumer apps In AWS Lambda with the Glue Schema Registry. In part 1, I illustrate the infrastructure and Kafka apps while these apps will be deployed and their behaviour will be discussed in part 2.
- Part 1 – Develop Kafka Producer and Consumer Apps (this post)
- Part 2 – Deploy Kafka Apps
Fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Note that, before sending a message, the producer validates the schema and registers a new one if it is not registered yet. Then it serializes the message and sends it to the cluster.
Once messages are sent to a Kafka topic, they can be consumed by Lambda where Amazon MSK is configured as an event source. The serialized record (message key or value) includes the schema ID so that the consumer can request the schema from the schema registry (if not cached) in order to deserialize it.
The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.
A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (vpc.tf). Also, a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (vpn.tf). It is particularly useful to monitor and manage the MSK cluster and Kafka topic as well as developing the Kafka producer Lambda function locally. The details about how to configure the VPN server can be found in an earlier post. The source can be found in the GitHub repository of this post.
A MSK cluster with 3 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in private subnets and IAM authentication is used for the client authentication method. Finally, additional server configurations are added such as enabling auto creation of topics and topic deletion.
Two security groups are created – one for the MSK cluster and the other for the Lambda apps.
The inbound/outbound rules of the former are created for accessing the cluster by
- Event Source Mapping (ESM) for Lambda
- This is for the Lambda consumer that subscribes the MSK cluster. As described in the AWS re:Post doc, when a Lambda function is configured with an Amazon MSK trigger or a self-managed Kafka trigger, an ESM resource is automatically created. An ESM is separate from the Lambda function, and it continuously polls records from the topic in the Kafka cluster. The ESM bundles those records into a payload. Then, it calls the Lambda Invoke API to deliver the payload to your Lambda function for processing. Note it doesn’t inherit the VPC network settings of the Lambda function but uses the subnet and security group settings that are configured on the target MSK cluster. Therefore, the MSK cluster’s security group must include a rule that grants ingress traffic from itself and egress traffic to itself. For us, the rules on port 9098 as the cluster only supports IAM authentication. Also, an additional egress rule is created to access the Glue Schema Registry.
- Other Resources
- Two ingress rules are created for the VPN server and Lambda. The latter is only for the Lambda producer because the consumer doesn’t rely on the Lambda network setting.
The second security group is created here, while the Lambda function is created in a different Terraform stack. This is for ease of adding it to the inbound rule of the MSK’s security group. Later we will discuss how to make use of it with the Lambda function. The outbound rule allows all outbound traffic although only port 9098 for the MSK cluster and 443 for the Glue Schema Registry would be sufficient.
The resources related to the Kafka producer and consumer Lambda functions are managed in a separate Terraform stack. This is because it is easier to build the relevant resources iteratively. Note the SAM CLI builds the whole Terraform stack even for a small change of code, and it wouldn’t be convenient if the entire resources are managed in the same stack.
The fake order data is generated using the Faker package and the dataclasses_avroschema package is used to automatically generate the Avro schema according to its attributes. A mixin class called InjectCompatMixin is injected into the Order class, which adds a schema compatibility mode into the generated schema. The auto() class method will be used to instantiate the class automatically. Finally, the OrderMore class is created for the schema evolution demo, which will be discussed later.
The generated schema of the
Order class can be found below.
Below shows an example order record.
The aws-glue-schema-registry is used serialize the value of order messages. It provides the KafkaSerializer that validates, registers and serializes the relevant records. It supports JSON and AVRO schemas, and we can add it to the value_serializer argument of the KafkaProducer class. By default, the schemas are named as <topic>-key and <topic>-value and it can be changed by updating the schema_naming_strategy argument. Note that, when sending a message, the value should be a tuple of data and schema. Note also that the stable version of the kafka-python package does not support the IAM authentication method. Therefore, we need to install the package from a forked repository as discussed in this GitHub issue.
The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. The last conditional block is for demonstrating a schema evolution example, which will be discussed later.
The VPC, subnets, Lambda security group and MSK cluster are created in the infra Terraform stack, and they need to be obtained from the Kafka app stack. It can be achieved using the Terraform data sources as shown below. Note that the private subnets can be filtered by a specific tag (Tier: Private), which is added when creating them.
The AWS Lambda Terraform module is used to create the producer Lambda function. Note that, in order to develop a Lambda function using AWS SAM, we need to create SAM metadata resource, which provides the AWS SAM CLI with the information it needs to locate Lambda functions and layers, along with their source code, build dependencies, and build logic from within your Terraform project. It is created by default by the Terraform module, which is convenient. Also, we need to give permission to the EventBridge rule to invoke the Lambda function, and it is given by the
The producer Lambda function needs permission to send messages to the orders topic of the MSK cluster. Also, it needs permission on the Glue schema registry and schema. The following IAM policy is added to the Lambda function.
The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that 5 targets that point to the Kafka producer Lambda function are created so that it is invoked concurrently every minute.
The Lambda event includes records, which is a dictionary where the key is a topic partition (topic_name-partiton_number) and the value is a list of consumer records. The consumer records include both the message metadata (topic, partition, offset, timestamp…), key and value. An example payload is shown below.
ConsumerRecordclass parses/formats a consumer record. As the key and value are returned as base64 encoded string, it is decoded into bytes, followed by decoding or deserializing appropriately. The
LambdaDeserializerclass is created to deserialize the value. Also, the message timestamp is converted into the datetime object. The parse_record() method returns the consumer record with parsed/formatted values.
The AWS Lambda Terraform module is used to create the consumer Lambda function as well. Lambda event source mapping is created so that it polls messages from the orders topic and invoke the consumer function. Also, we need to give permission to the MSK cluster to invoke the Lambda function, and it is given by the
As the Lambda event source mapping uses the permission of the Lambda function, we need to add permission related to Kafka cluster, Kafka and networking – see the AWS documentation for details. Finally, permission on the Glue schema registry and schema is added as the consumer should be able to request relevant schemas.
Schema registry provides a centralised repository for managing and validating schemas for topic message data. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with a range of AWS services. In this series, we discuss how to integrate Python Kafka producer and consumer apps in AWS Lambda with the Glue Schema Registry. In this post, I illustrated the infrastructure and Kafka apps in detail.