Integrate Glue Schema Registry with Your Python Kafka App – Part 2

In this series, we discuss how to integrate Python Kafka producer and consumer apps In AWS Lambda with the Glue Schema Registry. In part 1, we discuss the infrastructure and Kafka apps in detail. In this post, these apps are deployed and their behaviour is discussed.

Architecture

Fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Note that, before sending a message, the producer validates the schema and registers a new one if it is not registered yet. Then it serializes the message and sends it to the cluster.

Once messages are sent to a Kafka topic, they can be consumed by Lambda where Amazon MSK is configured as an event source. The serialized record (message key or value) includes the schema ID so that the consumer can request the schema from the schema registry (if not cached) in order to deserialize it.

The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.

Schema Evolution Demo

Before testing the Kafka applications, I’ll quickly demonstrate how the schema registry can be used for managing and validating schemas for topic message data. Each schema can have a compatibility mode (or disabled) and the scope of changes is restricted by it. For example, the default BACKWARD mode only allows you to delete fields or add optional fields. (See the Confluent document for a quick summary.) Therefore, if we add a mandatory field to an existing schema, it will be not validated, and it fails to send a message to the topic. In order to illustrate it, I created a single node Kafka cluster using docker-compose as shown below.

# glue-schema-registry/compose-demo.yml
version: “3.5”
services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.8
    container_name: zookeeper
    ports:
      – “2181”
    networks:
      – kafkanet
    environment:
      – ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      – zookeeper_data:/bitnami/zookeeper
  kafka-0:
    image: docker.io/bitnami/kafka:3.3
    container_name: kafka-0
    expose:
      – 9092
    ports:
      – “9093:9093”
    networks:
      – kafkanet
    environment:
      – ALLOW_PLAINTEXT_LISTENER=yes
      – KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      – KAFKA_CFG_BROKER_ID=0
      – KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      – KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      – KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://localhost:9093
      – KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    volumes:
      – kafka_0_data:/bitnami/kafka
    depends_on:
      – zookeeper
networks:
  kafkanet:
    name: kafka-network
volumes:
  zookeeper_data:
    driver: local
  kafka_0_data:
    driver: local

Recall that the producer lambda handler has a conditional block, and it is executed when it is called as a script. If we don’t specify the environment variable of USE_MORE, it sends a messages based on the Order class. Otherwise, a message is created from the OrderMore class, which has an additional boolean attribute called is_prime. As the compatibility mode is set to be BACKWARD, we can expect the second round of execution will not be successful. As shown below, I executed the lambda handler twice and the second round failed with the following error, which indicates schema validation failure.

aws_schema_registry.exception.SchemaRegistryException: Schema Found but status is FAILURE

export BOOTSTRAP_SERVERS=localhost:9093
export TOPIC_NAME=demo
export REGISTRY_NAME=customer
cd glue-schema-registry/app/producer
## Round 1 – send message from the Order class
python lambda_handler.py
## Round 2 – send message from the OrderMore class
export USE_MORE=1
python lambda_handler.py

We can see the details from the schema version and the second version is marked as failed.

Note that schema versioning and validation would be more relevant to the clients that tightly link the schema and message records. However, it would still be important for a Python client in order to work together with those clients or Kafka connect.

Deployment

Topic Creation

We plan to create the orders topic with multiple partitions. Although we can use the Kafka CLI tool, it can be performed easily using Kpow. It is a Kafka monitoring and management tool, which provides a web UI. Also, it supports the Glue Schema Registry and MSK Connect out-of-box, which is quite convenient. In the docker-compose file, we added environment variables for the MSK cluster, MSK Connect and Glue Schema Registry details. Note it fails to start if the schema registry does not exist. I created the registry while I demonstrated schema evolution, or it can be created simply as shown below.

$ aws glue create-registry --registry-name customer

# glue-schema-registry/docker-compose.yml
version: “3.5”
services:
  kpow:
    image: factorhouse/kpow-ce:91.2.1
    container_name: kpow
    ports:
      – “3000:3000”
    networks:
      – kafkanet
    environment:
      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
      # kafka cluster
      BOOTSTRAP: $BOOTSTRAP_SERVERS
      SECURITY_PROTOCOL: SASL_SSL
      SASL_MECHANISM: AWS_MSK_IAM
      SASL_CLIENT_CALLBACK_HANDLER_CLASS: software.amazon.msk.auth.iam.IAMClientCallbackHandler
      SASL_JAAS_CONFIG: software.amazon.msk.auth.iam.IAMLoginModule required;
      # msk connect
      CONNECT_AWS_REGION: $AWS_DEFAULT_REGION
      # glue schema registry
      SCHEMA_REGISTRY_ARN: $SCHEMA_REGISTRY_ARN
      SCHEMA_REGISTRY_REGION: $AWS_DEFAULT_REGION
networks:
  kafkanet:
    name: kafka-network

Once started, we can visit the UI on port 3000. The topic is created in the Topic menu by specifying the topic name and the number of partitions.

Once created, we can check details of the topic by selecting the topic from the drop-down menu.

Local Testing with SAM

To simplify development, the EventBridge permission is disabled by setting to_enable_trigger to false. Also, it is shortened to loop before it gets stopped by reducing max_run_sec to 10.

# glue-schema-registry/app/variables.tf
locals {
  …
  producer = {
    …
    to_enable_trigger = false
    environment = {
      topic_name    = “orders”
      registry_name = “customer”
      max_run_sec   = 10
    }
  }
  …
}

The Lambda function can be built with the SAM build command while specifying the hook name as terraform and enabling beta features. Once completed, it stores the build artifacts and template in the .aws-sam folder.

$ sam build –hook-name terraform –beta-features
# Apply complete! Resources: 3 added, 0 changed, 0 destroyed.
# Build Succeeded
# Built Artifacts  : .aws-sam/build
# Built Template   : .aws-sam/build/template.yaml
# Commands you can use next
# =========================
# [*] Invoke Function: sam local invoke –hook-name terraform
# [*] Emulate local Lambda functions: sam local start-lambda –hook-name terraform
# SAM CLI update available (1.78.0); (1.70.0 installed)
# To download: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html

We can invoke the Lambda function locally using the SAM local invoke command. The Lambda function is invoked in a Docker container and the invocation logs are printed in the terminal as shown below. Note that we should be connected to the VPN server in order to send messages into the MSK cluster, which is deployed in private subnets.

$ sam local invoke –hook-name terraform module.kafka_producer_lambda.aws_lambda_function.this[0] –beta-features
# Experimental features are enabled for this session.
# Visit the docs page to learn more about the AWS Beta terms https://aws.amazon.com/service-terms/.
# Skipped prepare hook. Current application is already prepared.
# Invoking lambda_handler.lambda_function (python3.8)
# Skip pulling image and use local one: public.ecr.aws/sam/emulation-python3.8:rapid-1.70.0-x86_64.
# Mounting /home/jaehyeon/personal/kafka-pocs/glue-schema-registry/app/.aws-sam/build/ModuleKafkaProducerLambdaAwsLambdaFunctionThis069E06354 as /var/task:ro,delegated inside runtime container
# START RequestId: fdbba255-e5b0-4e21-90d3-fe0b2ebbf629 Version: $LATEST
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# 1000 records are sent in 11 seconds …
# END RequestId: fdbba255-e5b0-4e21-90d3-fe0b2ebbf629
# REPORT RequestId: fdbba255-e5b0-4e21-90d3-fe0b2ebbf629  Init Duration: 0.22 ms  Duration: 12146.61 ms   Billed Duration: 12147 ms       Memory Size: 128 MB     Max Memory Used: 128 MB
# null

Once completed, we can check the value schema (orders-value) is created in the Kpow UI as shown below.

We can check the messages. In order to check them correctly, we need to select AVRO as the value deserializer and glue1 as the schema registry.

Kafka App Deployment

Now we can deploy the Kafka applications using Terraform as usual after resetting the configuration variables. Once deployed, we can see that the scheduler rule has 5 targets of the same Lambda function.

We can see the Lambda consumer parses the consumer records correctly in CloudWatch logs.

Summary

Schema registry provides a centralized repository for managing and validating schemas for topic message data. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with a range of AWS services. In this series, we discuss how to integrate Python Kafka producer and consumer apps in AWS Lambda with the Glue Schema Registry. In this post, the Kafka apps are deployed and their behaviour is discussed.

Enjoyed this blog?

Share it with your network!

Move faster with confidence