Integrate Glue Schema Registry with Your Python Kafka App – Part 1

As Kafka producer and consumer apps are decoupled, they operate on Kafka topics rather than communicating with each other directly. As described in the Confluent document, Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. Producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with Apache Kafka, Amazon Managed Streaming for Apache Kafka, Amazon Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda. In this series, we discuss how to integrate Python Kafka producer and consumer apps In AWS Lambda with the Glue Schema Registry. In part 1, I illustrate the infrastructure and Kafka apps while these apps will be deployed and their behaviour will be discussed in part 2.

Architecture

Fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Note that, before sending a message, the producer validates the schema and registers a new one if it is not registered yet. Then it serializes the message and sends it to the cluster.

Once messages are sent to a Kafka topic, they can be consumed by Lambda where Amazon MSK is configured as an event source. The serialized record (message key or value) includes the schema ID so that the consumer can request the schema from the schema registry (if not cached) in order to deserialize it.

The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.

Infrastructure

A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (vpc.tf). Also, a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (vpn.tf). It is particularly useful to monitor and manage the MSK cluster and Kafka topic as well as developing the Kafka producer Lambda function locally. The details about how to configure the VPN server can be found in an earlier post. The source can be found in the GitHub repository of this post.

MSK

A MSK cluster with 3 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in private subnets and IAM authentication is used for the client authentication method. Finally, additional server configurations are added such as enabling auto creation of topics and topic deletion.

 

# glue-schema-registry/infra/variable.tf
locals {
  ...
  msk = {
    version          = “3.3.1”
    instance_size    = “kafka.m5.large”
    ebs_volume_size  = 20
    log_retention_ms = 604800000 # 7 days
  }
  ...
}
# glue-schema-registry/infra/msk.tf
resource “aws_msk_cluster” “msk_data_cluster” {
  cluster_name           = ${local.name}-msk-cluster”
  kafka_version          = local.msk.version
  number_of_broker_nodes = length(module.vpc.private_subnets)
  configuration_info {
    arn      = aws_msk_configuration.msk_config.arn
    revision = aws_msk_configuration.msk_config.latest_revision
  }

  broker_node_group_info {
    instance_type   = local.msk.instance_size
    client_subnets  = module.vpc.private_subnets
    security_groups = [aws_security_group.msk.id]
    storage_info {
      ebs_storage_info {
        volume_size = local.msk.ebs_volume_size
      }
    }
  }

  client_authentication {
    sasl {
      iam = true
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.msk_cluster_lg.name
      }
      s3 {
        enabled = true
        bucket  = aws_s3_bucket.default_bucket.id
       prefix  = “logs/msk/cluster-“
      }
    }
  }

  tags = local.tags

  depends_on = [aws_msk_configuration.msk_config]
}

resource “aws_msk_configuration” “msk_config” {
  name = ${local.name}-msk-configuration”

  kafka_versions = [local.msk.version]

  server_properties = <<PROPERTIES
    auto.create.topics.enable = true
    delete.topic.enable = true
    log.retention.ms = ${local.msk.log_retention_ms}
  PROPERTIES
}

Security Groups

Two security groups are created – one for the MSK cluster and the other for the Lambda apps.

The inbound/outbound rules of the former are created for accessing the cluster by

  • Event Source Mapping (ESM) for Lambda
    • This is for the Lambda consumer that subscribes the MSK cluster. As described in the AWS re:Post doc, when a Lambda function is configured with an Amazon MSK trigger or a self-managed Kafka trigger, an ESM resource is automatically created. An ESM is separate from the Lambda function, and it continuously polls records from the topic in the Kafka cluster. The ESM bundles those records into a payload. Then, it calls the Lambda Invoke API to deliver the payload to your Lambda function for processing. Note it doesn’t inherit the VPC network settings of the Lambda function but uses the subnet and security group settings that are configured on the target MSK cluster. Therefore, the MSK cluster’s security group must include a rule that grants ingress traffic from itself and egress traffic to itself. For us, the rules on port 9098 as the cluster only supports IAM authentication. Also, an additional egress rule is created to access the Glue Schema Registry.
  • Other Resources
    • Two ingress rules are created for the VPN server and Lambda. The latter is only for the Lambda producer because the consumer doesn’t rely on the Lambda network setting.

 

The second security group is created here, while the Lambda function is created in a different Terraform stack. This is for ease of adding it to the inbound rule of the MSK’s security group. Later we will discuss how to make use of it with the Lambda function. The outbound rule allows all outbound traffic although only port 9098 for the MSK cluster and 443 for the Glue Schema Registry would be sufficient.

 

# glue-schema-registry/infra/msk.tf
resource “aws_security_group” “msk” {
  name   = ${local.name}-msk-sg”
  vpc_id = module.vpc.vpc_id

  lifecycle {
    create_before_destroy = true
  }

  tags = local.tags
}

# for lambda event source mapping
resource “aws_security_group_rule” “msk_ingress_self_broker” {
 type                     = “ingress”
  description              = “msk ingress self”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.msk.id
}

resource “aws_security_group_rule” “msk_egress_self_broker” {
 type                     = “egress”
  description              = “msk egress self”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.msk.id
}

resource “aws_security_group_rule” “msk_all_outbound” {
 type              = “egress”
  description       = “allow outbound all”
  security_group_id = aws_security_group.msk.id
  protocol          = “-1”
  from_port         = “0”
  to_port           = “0”
  cidr_blocks       = [“0.0.0.0/0”]
}

# for other resources
resource “aws_security_group_rule” “msk_vpn_inbound” {
  count                    = local.vpn.to_create ? 1 : 0
 type                     = “ingress”
  description              = “VPN access”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.vpn[0].id
}

resource “aws_security_group_rule” “msk_lambda_inbound” {
 type                     = “ingress”
  description              = “lambda access”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.kafka_app_lambda.id
}

...

# lambda security group
resource “aws_security_group” “kafka_app_lambda” {
  name   = ${local.name}-lambda-msk-access”
  vpc_id = module.vpc.vpc_id

  lifecycle {
    create_before_destroy = true
  }

  tags = local.tags
}

resource “aws_security_group_rule” “kafka_app_lambda_msk_egress” {
 type              = “egress”
  description       = “allow outbound all”
  security_group_id = aws_security_group.kafka_app_lambda.id
  protocol          = “-1”
  from_port         = 0
  to_port           = 0
  cidr_blocks       = [“0.0.0.0/0”]
}

Kafka Apps

The resources related to the Kafka producer and consumer Lambda functions are managed in a separate Terraform stack. This is because it is easier to build the relevant resources iteratively. Note the SAM CLI builds the whole Terraform stack even for a small change of code, and it wouldn’t be convenient if the entire resources are managed in the same stack.

Producer App

The fake order data is generated using the Faker package and the dataclasses_avroschema package is used to automatically generate the Avro schema according to its attributes. A mixin class called InjectCompatMixin is injected into the Order class, which adds a schema compatibility mode into the generated schema. The auto() class method will be used to instantiate the class automatically. Finally, the OrderMore class is created for the schema evolution demo, which will be discussed later.

# glue-schema-registry/app/producer/src/order.py
import datetime
import string
import json
import typing
import dataclasses
import enum
from faker import Faker
from dataclasses_avroschema import AvroModel
class Compatibility(enum.Enum):
    NONE = “NONE”
    DISABLED = “DISABLED”
    BACKWARD = “BACKWARD”
    BACKWARD_ALL = “BACKWARD_ALL”
    FORWARD = “FORWARD”
    FORWARD_ALL = “FORWARD_ALL”
    FULL = “FULL”
    FULL_ALL = “FULL_ALL”
class InjectCompatMixin:
    @classmethod
    def updated_avro_schema_to_python(cls, compat: Compatibility = Compatibility.BACKWARD):
        schema = cls.avro_schema_to_python()
        schema[“compatibility”] = compat.value
        return schema
    @classmethod
    def updated_avro_schema(cls, compat: Compatibility = Compatibility.BACKWARD):
        schema = cls.updated_avro_schema_to_python(compat)
        return json.dumps(schema)
@dataclasses.dataclass
class OrderItem(AvroModel):
    product_id: int
    quantity: int
@dataclasses.dataclass
class Order(AvroModel, InjectCompatMixin):
    “Online fake order item”
    order_id: str
    ordered_at: datetime.datetime
    user_id: str
    order_items: typing.List[OrderItem]
    class Meta:
        namespace = “Order V1”
    def asdict(self):
        return dataclasses.asdict(self)
    @classmethod
    def auto(cls, fake: Faker = Faker()):
        rand_int = fake.random_int(1, 1000)
        user_id = “”.join(
            [string.ascii_lowercase[int(s)] if s.isdigit() else s for s in hex(rand_int)]
        )[::-1]
        order_items = [
            OrderItem(fake.random_int(1, 9999), fake.random_int(1, 10))
            for _ in range(fake.random_int(1, 4))
        ]
        return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
    def create(self, num: int):
        return [self.auto() for _ in range(num)]
@dataclasses.dataclass
class OrderMore(Order):
    is_prime: bool
    @classmethod
    def auto(cls, fake: Faker = Faker()):
        o = Order.auto()
        return cls(o.order_id, o.ordered_at, o.user_id, o.order_items, fake.pybool())

The generated schema of the Order class can be found below.

{
    “doc”: “Online fake order item”,
    “namespace”: “Order V1”,
    “name”: “Order”,
    “compatibility”: “BACKWARD”,
    “type”: “record”,
    “fields”: [
        {
            “name”: “order_id”,
            “type”: “string”
        },
        {
            “name”: “ordered_at”,
            “type”: {
                “type”: “long”,
                “logicalType”: “timestamp-millis”
            }
        },
        {
            “name”: “user_id”,
            “type”: “string”
        },
        {
            “name”: “order_items”,
            “type”: {
                “type”: “array”,
                “items”: {
                    “type”: “record”,
                    “name”: “OrderItem”,
                    “fields”: [
                        {
                            “name”: “product_id”,
                            “type”: “long”
                        },
                        {
                            “name”: “quantity”,
                            “type”: “long”
                        }
                    ]
                },
                “name”: “order_item”
            }
        }
    ]
}

 

Below shows an example order record.

 

{
  “order_id”: “53263c42-81b3-4a53-8067-fcdb44fa5479”,
  “ordered_at”: 1680745813045,
  “user_id”: “dicxa”,
  “order_items”: [
    {
      “product_id”: 5947,
      “quantity”: 8
    }
  ]
}

 

Producer

The aws-glue-schema-registry is used serialize the value of order messages. It provides the KafkaSerializer that validates, registers and serializes the relevant records. It supports JSON and AVRO schemas, and we can add it to the value_serializer argument of the KafkaProducer class. By default, the schemas are named as <topic>-key and <topic>-value and it can be changed by updating the schema_naming_strategy argument. Note that, when sending a message, the value should be a tuple of data and schema. Note also that the stable version of the kafka-python package does not support the IAM authentication method. Therefore, we need to install the package from a forked repository as discussed in this GitHub issue.

 

# glue-schema-registry/app/producer/src/producer.py
import os
import datetime
import json
import typing
import boto3
import botocore.exceptions
from kafka import KafkaProducer
from aws_schema_registry import SchemaRegistryClient
from aws_schema_registry.avro import AvroSchema
from aws_schema_registry.adapter.kafka import KafkaSerializer
from aws_schema_registry.exception import SchemaRegistryException
from .order import Order
class Producer:
    def __init__(self, bootstrap_servers: list, topic: str, registry: str, is_local: bool = False):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.registry = registry
        self.glue_client = boto3.client(
            “glue”, region_name=os.getenv(“AWS_DEFAULT_REGION”, “ap-southeast-2”)
        )
        self.is_local = is_local
        self.producer = self.create()
    @property
    def serializer(self):
        client = SchemaRegistryClient(self.glue_client, registry_name=self.registry)
        return KafkaSerializer(client)
    def create(self):
        params = {
            “bootstrap_servers”: self.bootstrap_servers,
            “key_serializer”: lambda v: json.dumps(v, default=self.serialize).encode(“utf-8”),
            “value_serializer”: self.serializer,
        }
        if not self.is_local:
            params = {
                **params,
                **{“security_protocol”: “SASL_SSL”, “sasl_mechanism”: “AWS_MSK_IAM”},
            }
        return KafkaProducer(**params)
    def send(self, orders: typing.List[Order], schema: AvroSchema):
        if not self.check_registry():
            print(f”registry not found, create {self.registry}”)
            self.create_registry()
        for order in orders:
            data = order.asdict()
            try:
                self.producer.send(
                    self.topic, key={“order_id”: data[“order_id”]}, value=(data, schema)
                )
            except SchemaRegistryException as e:
                raise RuntimeError(“fails to send a message”) from e
        self.producer.flush()
    def serialize(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.isoformat()
        if isinstance(obj, datetime.date):
            return str(obj)
        return obj
    def check_registry(self):
        try:
            self.glue_client.get_registry(RegistryId={“RegistryName”: self.registry})
            return True
        except botocore.exceptions.ClientError as e:
            if e.response[“Error”][“Code”] == “EntityNotFoundException”:
                return False
            else:
                raise e
    def create_registry(self):
        try:
            self.glue_client.create_registry(RegistryName=self.registry)
            return True
        except botocore.exceptions.ClientError as e:
            if e.response[“Error”][“Code”] == “AlreadyExistsException”:
                return True
            else:
                raise e

Lambda Handler

The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. The last conditional block is for demonstrating a schema evolution example, which will be discussed later. 

 

# glue-schema-registry/app/producer/lambda_handler.py
import os
import datetime
import time
from aws_schema_registry.avro import AvroSchema
from src.order import Order, OrderMore, Compatibility
from src.producer import Producer
def lambda_function(event, context):
    producer = Producer(
        bootstrap_servers=os.environ[“BOOTSTRAP_SERVERS”].split(“,”),
        topic=os.environ[“TOPIC_NAME”],
        registry=os.environ[“REGISTRY_NAME”],
    )
    s = datetime.datetime.now()
    ttl_rec = 0
    while True:
        orders = Order.auto().create(100)
        schema = AvroSchema(Order.updated_avro_schema(Compatibility.BACKWARD))
        producer.send(orders, schema)
        ttl_rec += len(orders)
        print(f“sent {len(orders)} messages”)
        elapsed_sec = (datetime.datetime.now() – s).seconds
        if elapsed_sec > int(os.getenv(“MAX_RUN_SEC”, “60”)):
            print(f“{ttl_rec} records are sent in {elapsed_sec} seconds …”)
            break
        time.sleep(1)
if __name__ == “__main__”:
    producer = Producer(
        bootstrap_servers=os.environ[“BOOTSTRAP_SERVERS”].split(“,”),
        topic=os.environ[“TOPIC_NAME”],
        registry=os.environ[“REGISTRY_NAME”],
        is_local=True,
    )
    use_more = os.getenv(“USE_MORE”) is not None
    if not use_more:
        orders = Order.auto().create(1)
        schema = AvroSchema(Order.updated_avro_schema(Compatibility.BACKWARD))
    else:
        orders = OrderMore.auto().create(1)
        schema = AvroSchema(OrderMore.updated_avro_schema(Compatibility.BACKWARD))
    print(orders)
    producer.send(orders, schema)

Lambda Resource

The VPC, subnets, Lambda security group and MSK cluster are created in the infra Terraform stack, and they need to be obtained from the Kafka app stack. It can be achieved using the Terraform data sources as shown below. Note that the private subnets can be filtered by a specific tag (Tier: Private), which is added when creating them.

# glue-schema-registry/infra/vpc.tf
module “vpc” {
  source  = “terraform-aws-modules/vpc/aws”
  version = “~> 3.14”
  name = ${local.name}-vpc”
  cidr = local.vpc.cidr
  azs             = local.vpc.azs
  public_subnets  = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k)]
  private_subnets = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k + 3)]
  ...
  private_subnet_tags = {
    “Tier” = “Private”
  }
  tags = local.tags
}
# glue-schema-registry/app/variables.tf
data “aws_caller_identity” “current” {}
data “aws_region” “current” {}
data “aws_vpc” “selected” {
 filter {
    name   = “tag:Name”
    values = [${local.infra_prefix}]
  }
}
data “aws_subnets” “private” {
 filter {
    name   = “vpc-id”
    values = [data.aws_vpc.selected.id]
  }
  tags = {
    Tier = “Private”
  }
}
data “aws_msk_cluster” “msk_data_cluster” {
  cluster_name = ${local.infra_prefix}-msk-cluster”
}
data “aws_security_group” “kafka_producer_lambda” {
  name = ${local.infra_prefix}-lambda-msk-access”
}
locals {
  ...
  infra_prefix = “glue-schema-registry”
  ...
}

 

The AWS Lambda Terraform module is used to create the producer Lambda function. Note that, in order to develop a Lambda function using AWS SAM, we need to create SAM metadata resource, which provides the AWS SAM CLI with the information it needs to locate Lambda functions and layers, along with their source code, build dependencies, and build logic from within your Terraform project. It is created by default by the Terraform module, which is convenient. Also, we need to give permission to the EventBridge rule to invoke the Lambda function, and it is given by the aws_lambda_permission resource.

 

# glue-schema-registry/app/variables.tf
locals {
  name        = local.infra_prefix
  region      = data.aws_region.current.name
  environment = “dev”
  infra_prefix = “glue-schema-registry”
  producer = {
    src_path          = “producer”
    function_name     = “kafka_producer”
    handler           = “lambda_handler.lambda_function”
    concurrency       = 5
    timeout           = 90
    memory_size       = 128
    runtime           = “python3.8”
    schedule_rate     = “rate(1 minute)”
    to_enable_trigger = true
    environment = {
      topic_name    = “orders”
      registry_name = “customer”
      max_run_sec   = 60
    }
  }
  …
}
# glue-schema-registry/app/main.tf
module “kafka_producer_lambda” {
  source = “terraform-aws-modules/lambda/aws”
  function_name          = local.producer.function_name
  handler                = local.producer.handler
  runtime                = local.producer.runtime
  timeout                = local.producer.timeout
  memory_size            = local.producer.memory_size
  source_path            = local.producer.src_path
  vpc_subnet_ids         = data.aws_subnets.private.ids
  vpc_security_group_ids = [data.aws_security_group.kafka_app_lambda.id]
  attach_network_policy  = true
  attach_policies        = true
  policies               = [aws_iam_policy.msk_lambda_producer_permission.arn]
  number_of_policies     = 1
  environment_variables = {
    BOOTSTRAP_SERVERS = data.aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
    TOPIC_NAME        = local.producer.environment.topic_name
    REGISTRY_NAME     = local.producer.environment.registry_name
    MAX_RUN_SEC       = local.producer.environment.max_run_sec
  }
  tags = local.tags
}
resource “aws_lambda_function_event_invoke_config” “kafka_producer_lambda” {
  function_name          = module.kafka_producer_lambda.lambda_function_name
  maximum_retry_attempts = 0
}
resource “aws_lambda_permission” “allow_eventbridge” {
  count         = local.producer.to_enable_trigger ? 1 : 0
  statement_id  = “InvokeLambdaFunction”
  action        = “lambda:InvokeFunction”
  function_name = local.producer.function_name
  principal     = “events.amazonaws.com”
  source_arn    = module.eventbridge.eventbridge_rule_arns[“crons”]
  depends_on = [
    module.eventbridge
  ]
}

IAM Permission

The producer Lambda function needs permission to send messages to the orders topic of the MSK cluster. Also, it needs permission on the Glue schema registry and schema. The following IAM policy is added to the Lambda function.

 

# glue-schema-registry/app/variables.tf
locals {
  name        = local.infra_prefix
  region      = data.aws_region.current.name
  environment = “dev”
  infra_prefix = “glue-schema-registry”
  producer = {
    src_path          = “producer”
    function_name     = “kafka_producer”
    handler           = “lambda_handler.lambda_function”
    concurrency       = 5
    timeout           = 90
    memory_size       = 128
    runtime           = “python3.8”
    schedule_rate     = “rate(1 minute)”
    to_enable_trigger = true
    environment = {
      topic_name    = “orders”
      registry_name = “customer”
      max_run_sec   = 60
    }
  }
  …
}
# glue-schema-registry/app/main.tf
module “kafka_producer_lambda” {
  source = “terraform-aws-modules/lambda/aws”
  function_name          = local.producer.function_name
  handler                = local.producer.handler
  runtime                = local.producer.runtime
  timeout                = local.producer.timeout
  memory_size            = local.producer.memory_size
  source_path            = local.producer.src_path
  vpc_subnet_ids         = data.aws_subnets.private.ids
  vpc_security_group_ids = [data.aws_security_group.kafka_app_lambda.id]
  attach_network_policy  = true
  attach_policies        = true
  policies               = [aws_iam_policy.msk_lambda_producer_permission.arn]
  number_of_policies     = 1
  environment_variables = {
    BOOTSTRAP_SERVERS = data.aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
    TOPIC_NAME        = local.producer.environment.topic_na

EventBridge Rule

The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that 5 targets that point to the Kafka producer Lambda function are created so that it is invoked concurrently every minute.

module “eventbridge” {
  source = “terraform-aws-modules/eventbridge/aws”
  create_bus = false
  rules = {
    crons = {
      description         = “Kafka producer lambda schedule”
      schedule_expression = local.producer.schedule_rate
    }
  }
  targets = {
    crons = [for i in range(local.producer.concurrency) : {
      name = “lambda-target-${i}”
      arn  = module.kafka_producer_lambda.lambda_function_arn
    }]
  }
  depends_on = [
    module.kafka_producer_lambda
  ]
  tags = local.tags
}

Consumer App

Lambda Handler

The Lambda event includes records, which is a dictionary where the key is a topic partition (topic_name-partiton_number) and the value is a list of consumer records. The consumer records include both the message metadata (topic, partition, offset, timestamp…), key and value. An example payload is shown below.

 

{
    “eventSource”: “aws:kafka”,
    “eventSourceArn”: “<msk-cluster-arn>”,
    “bootstrapServers”: “<bootstrap-server-addresses>”,
    “records”: {
        “orders-2”: [
            {
                “topic”: “orders”,
                “partition”: 2,
                “offset”: 10293,
                “timestamp”: 1680631941838,
                “timestampType”: “CREATE_TIME”,
                “key”: “eyJvcmRlcl9pZCI6ICJkNmQ4ZDJjNi1hODYwLTQyNTYtYWY1Yi04ZjU3NDkxZmM4YWYifQ==”,
                “value”: “AwDeD/rgjCxCeawN/ZaIO6VuSGQ2ZDhkMmM2LWE4NjAtNDI1Ni1hZjViLThmNTc0OTFmYzhhZu6pwtfpYQppYWJ4YQa8UBSEHgbkVAYA”,
                “headers”: [],
            }
        ]
    }

}

 
The ConsumerRecord class parses/formats a consumer record. As the key and value are returned as base64 encoded string, it is decoded into bytes, followed by decoding or deserializing appropriately. The LambdaDeserializer class is created to deserialize the value. Also, the message timestamp is converted into the datetime object. The parse_record() method returns the consumer record with parsed/formatted values.

# glue-schema-registry/app/consumer/lambda_handler.py
import os
import json
import base64
import datetime
import boto3
from aws_schema_registry import SchemaRegistryClient
from aws_schema_registry.adapter.kafka import Deserializer, KafkaDeserializer
class LambdaDeserializer(Deserializer):
    def __init__(self, registry: str):
        self.registry = registry
    @property
    def deserializer(self):
        glue_client = boto3.client(
            “glue”, region_name=os.getenv(“AWS_DEFAULT_REGION”, “ap-southeast-2”)
        )
        client = SchemaRegistryClient(glue_client, registry_name=self.registry)
        return KafkaDeserializer(client)
    def deserialize(self, topic: str, bytes_: bytes):
        return self.deserializer.deserialize(topic, bytes_)
class ConsumerRecord:
    def __init__(self, record: dict):
        self.topic = record[“topic”]
        self.partition = record[“partition”]
        self.offset = record[“offset”]
        self.timestamp = record[“timestamp”]
        self.timestamp_type = record[“timestampType”]
        self.key = record[“key”]
        self.value = record[“value”]
        self.headers = record[“headers”]
    def parse_key(self):
        return base64.b64decode(self.key).decode()
    def parse_value(self, deserializer: LambdaDeserializer):
        parsed = deserializer.deserialize(self.topic, base64.b64decode(self.value))
        return parsed.data
    def format_timestamp(self, to_str: bool = True):
        ts = datetime.datetime.fromtimestamp(self.timestamp / 1000)
        if to_str:
            return ts.isoformat()
        return ts
    def parse_record(
        self, deserializer: LambdaDeserializer, to_str: bool = True, to_json: bool = True
    ):
        rec = {
            **self.__dict__,
            **{
                “key”: self.parse_key(),
                “value”: self.parse_value(deserializer),
                “timestamp”: self.format_timestamp(to_str),
            },
        }
        if to_json:
            return json.dumps(rec, default=self.serialize)
        return rec
    def serialize(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.isoformat()
        if isinstance(obj, datetime.date):
            return str(obj)
        return obj
def lambda_function(event, context):
    deserializer = LambdaDeserializer(os.getenv(“REGISTRY_NAME”, “customer”))
    for _, records in event[“records”].items():
        for record in records:
            cr = ConsumerRecord(record)
            print(cr.parse_record(deserializer))

Lambda Resource

The AWS Lambda Terraform module is used to create the consumer Lambda function as well. Lambda event source mapping is created so that it polls messages from the orders topic and invoke the consumer function.  Also, we need to give permission to the MSK cluster to invoke the Lambda function, and it is given by the aws_lambda_permission resource.

 

# glue-schema-registry/app/consumer/lambda_handler.py
import os
import json
import base64
import datetime
import boto3
from aws_schema_registry import SchemaRegistryClient
from aws_schema_registry.adapter.kafka import Deserializer, KafkaDeserializer
class LambdaDeserializer(Deserializer):
    def __init__(self, registry: str):
        self.registry = registry
    @property
    def deserializer(self):
        glue_client = boto3.client(
            “glue”, region_name=os.getenv(“AWS_DEFAULT_REGION”, “ap-southeast-2”)
        )
        client = SchemaRegistryClient(glue_client, registry_name=self.registry)
        return KafkaDeserializer(client)
    def deserialize(self, topic: str, bytes_: bytes):
        return self.deserializer.deserialize(topic, bytes_)
class ConsumerRecord:
    def __init__(self, record: dict):
        self.topic = record[“topic”]
        self.partition = record[“partition”]
        self.offset = record[“offset”]
        self.timestamp = record[“timestamp”]
        self.timestamp_type = record[“timestampType”]
        self.key = record[“key”]
        self.value = record[“value”]
        self.headers = record[“headers”]
    def parse_key(self):
        return base64.b64decode(self.key).decode()
    def parse_value(self, deserializer: LambdaDeserializer):
        parsed = deserializer.deserialize(self.topic, base64.b64decode(self.value))
        return parsed.data
    def format_timestamp(self, to_str: bool = True):
        ts = datetime.datetime.fromtimestamp(self.timestamp / 1000)
        if to_str:
            return ts.isoformat()
        return ts
    def parse_record(
        self, deserializer: LambdaDeserializer, to_str: bool = True, to_json: bool = True
    ):
        rec = {
            **self.__dict__,
            **{
                “key”: self.parse_key(),
                “value”: self.parse_value(deserializer),
                “timestamp”: self.format_timestamp(to_str),
            },
        }
        if to_json:
            return json.dumps(rec, default=self.serialize)
        return rec
    def serialize(self, obj):
        if isinstance(obj, 

Lambda Resource

As the Lambda event source mapping uses the permission of the Lambda function, we need to add permission related to Kafka cluster, Kafka and networking – see the AWS documentation for details. Finally, permission on the Glue schema registry and schema is added as the consumer should be able to request relevant schemas.

 

# glue-schema-registry/app/main.tf
resource “aws_iam_policy” “msk_lambda_consumer_permission” {
  name = ${local.consumer.function_name}-msk-lambda-consumer-permission”
 policy = jsonencode({
    Version = “2012-10-17”
    Statement = [
      {
        Sid = “PermissionOnKafkaCluster”
        Action = [
          “kafka-cluster:Connect”,
          “kafka-cluster:DescribeGroup”,
          “kafka-cluster:AlterGroup”,
          “kafka-cluster:DescribeTopic”,
          “kafka-cluster:ReadData”,
          “kafka-cluster:DescribeClusterDynamicConfiguration”
        ]
        Effect = “Allow”
       Resource = [
          “arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.infra_prefix}-msk-cluster/*”,
          “arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.infra_prefix}-msk-cluster/*”,
          “arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.infra_prefix}-msk-cluster/*”
        ]
      },
      {
        Sid = “PermissionOnKafka”
        Action = [
          “kafka:DescribeCluster”,
          “kafka:GetBootstrapBrokers”
        ]
        Effect   = “Allow”
       Resource = “*”
      },
      {
        Sid = “PermissionOnNetwork”
        Action = [
          # The first three actions also exist in netwrok policy attachment in lambda module
          # “ec2:CreateNetworkInterface”,
          # “ec2:DescribeNetworkInterfaces”,
          # “ec2:DeleteNetworkInterface”,
          “ec2:DescribeVpcs”,
          “ec2:DescribeSubnets”,
          “ec2:DescribeSecurityGroups”
        ]
        Effect   = “Allow”
       Resource = “*”
      },
      {
        Sid = “PermissionOnGlueSchema”
        Action = [
          “glue:*Schema*”,
          “glue:ListRegistries”
        ]
        Effect   = “Allow”
       Resource = “*”
      }
    ]
  })
}

Summary

Schema registry provides a centralised repository for managing and validating schemas for topic message data. In AWS, the Glue Schema Registry supports features to manage and enforce schemas on data streaming applications using convenient integrations with a range of AWS services. In this series, we discuss how to integrate Python Kafka producer and consumer apps in AWS Lambda with the Glue Schema Registry. In this post, I illustrated the infrastructure and Kafka apps in detail.

Enjoyed this blog?

Share it with your network!

Move faster with confidence