In the previous post, we discussed a Change Data Capture (CDC) solution with a schema registry. A local development environment is set up using Docker Compose. The Debezium and Confluent S3 connectors are deployed with the Confluent Avro converter and the Apicurio registry is used as the schema registry service. A quick example is shown to illustrate how schema evolution can be managed by the schema registry. In this post, we’ll build the solution on AWS using MSK, MSK Connect, Aurora PostgreSQL and ECS.
Architecture
Below shows an updated CDC architecture with a schema registry. The Debezium connector talks to the schema registry first and checks if the schema is available. If it doesn’t exist, it is registered and cached in the schema registry. Then the producer serializes the data with the schema and sends it to the topic with the schema ID. When the sink connector consumes the message, it’ll read the schema with the ID and deserializes it. The schema registry uses a PostgreSQL database as an artifact store where multiple versions of schemas are kept. In this post, we’ll build it on AWS. An MSK cluster will be created and data will be pushed from a database deployed using Aurora PostgreSQL. The database has a schema called registry and schema metadata will be stored in it. The Apicurio registry will be deployed as an ECS service behind an internal load balancer.
Infrastructure
The main AWS resources will be deployed to private subnets of a VPC and connection between those will be managed by updating security group inbound rules. For example, the MSK connectors should have access to the registry service and the connectors’ security group ID should be added to the inbound rule of the registry service. As multiple resources are deployed to private subnets, it’ll be convenient to set up VPN so that access to them can be made from the developer machine. It can improve developer experience significantly. We’ll use Terraform for managing the resources on AWS and how to set up VPC, VPN and Aurora PostgreSQL is discussed in detail in one of my earlier posts. In this post, I’ll illustrate those that are not covered in the article. The Terraform source can be found in the GitHub repository for this post.
MSK Cluster
As discussed in one of the earlier posts, we’ll create a MSK cluster with 2 brokers of the kafka.m5.large instance type in order to prevent the failed authentication error. 2 inbound rules are configured for the MSK’s security group. The first one is allowing all access from its own security group and it is required for MSK connectors to have access to the MKS cluster. Note, when we create a connector from the AWS console, the cluster’s subnets and security group are selected for the connector by default. The second inbound rule is allowing the VPN’s security group at port 9098, which is the port of bootstrap servers for IAM authentication. Also an IAM role is created, which can be assumed by MSK connectors so as to have permission on the cluster, topic and group. The Terraform file for the MSK cluster and related resources can be found in infra/msk.tf.
Schema Registry
The schema registry is deployed via ECS as a Fargate task. 2 tasks are served by an ECS service and it can be accessed by an internal load balancer. The load balancer is configured to allow inbound traffic from the MSK cluster and VPN and it has access to the individual tasks. Normally inbound traffic to the tasks should be allowed to the load balancer only but, for testing, it is set that they accept inbound traffic from VPN as well. The Terraform file for the schema registry and related resources can be found in infra/registry.tf.
Setup Database
In order for the schema registry to work properly, the database should have the appropriate schema named registry. Also the database needs to have sample data loaded into the ods schema. Therefore it is not possible to create all resources at once and we need to skip creating the registry service at first. It can be done by setting the registry_create variable to false.
# infra/variables.tf |
A simple python application is created to set up the database and it can be run as shown below. Note do not forget to connect the VPN before executing the command.
(venv) $ python connect/data/load/main.py –help |
Deploy Schema Registry
Once the database setup is complete, we can apply the Terraform stack with the registry_create variable to true. When it’s deployed, we can check the APIs that the registry service supports as shown below. In line with the previous post, we’ll use the Confluent schema registry compatible API.
Kafka UI
The Kafka UI supports MSK IAM Authentication and we can use it to monitor and manage MSK clusters and related objects/resources. My AWS credentials are mapped to the container and my AWS profile (cevo) is added to the SASL config environment variable. Note environment variables are used for the bootstrap server endpoint and registry host. It can be started as docker-compose -f kafka-ui.yml up.
# kafka-ui.yml |
The UI can be checked on a browser as shown below.
Create Connectors
Creating custom plugins and connectors is illustrated in detail in one of my earlier posts. Here I’ll sketch key points only. The custom plugins for the source and sink connectors should include the Kafka Connect Avro Converter as well. The version 6.0.3 is used and plugin packaging can be checked in connect/local/download-connectors.sh.
The Debezium Postgres Connector is used as the source connector. Here the main difference from the earlier post is using the Confluent Avro Converter class for key and value converter properties and adding the schema registry URL.
# connect/msk/debezium.properties |
The sink connector also uses the Confluent Avro Converter class for key and value converter properties and the schema registry URL is added accordingly.
# connect/msk/confluent.properties |
As with the previous post, we can check the key and value schemas are created once the source connector is deployed. Note we can check the details of the schemas by clicking the relevant schema items.
We can see the messages (key and value) are properly deserialized within the UI as we added the schema registry URL as an environment variable and it can be accessed from it.
Schema Evolution
The schema registry keeps multiple versions of schemas and we can check it by adding a column to the table and updating records.
–// add a column with a default value |
Once the above queries are executed, we see a new version is added to the topic’s value schema and it includes the new field.
Summary
In this post, we continued the discussion of a Change Data Capture (CDC) solution with a schema registry and it is deployed to AWS. Multiple services including MSK, MSK Connect, Aurora PostgreSQL and ECS are used to build the solution. All major resources are deployed in private subnets and VPN is used to access them in order to improve developer experience. The Apicurio registry is used as the schema registry service and it is deployed as an ECS service. In order for the connectors to have access to the registry, the Confluent Avro Converter is packaged together with the connector sources. The post ends with illustrating how schema evolution is managed by the schema registry.