Use External Schema Registry with MSK Connect – Part 1 Local Development

When we discussed a Change Data Capture (CDC) solution in one of the earlier posts, we used the JSON converter that comes with Kafka Connect. We optionally enabled the key and value schemas and the topic messages include those schemas together with payload. It seems to be convenient at first as the messages are saved into S3 on their own. However it became cumbersome when we tried to use the DeltaStreamer utility. Specifically it requires the scheme of the files but unfortunately we cannot use the schema that is generated by the default JSON converter – it returns the struct type, which is not supported by the Hudi utility. In order to handle this issue, we created a schema with the record type using the Confluent Avro converter and used it after saving on S3. However, as we aimed to manage a long-running process, generating a schema manually was not an optimal solution because, for example, we’re not able to handle schema evolution effectively. In this post, we’ll discuss an improved architecture that makes use of a schema registry that resides outside of the Kafka cluster and allows the producers and consumers to reference the schemas externally.

Architecture

Below shows an updated CDC architecture with a schema registry. The Debezium connector talks to the schema registry first and checks if the schema is available. If it doesn’t exist, it is registered and cached in the schema registry. Then the producer serializes the data with the schema and sends it to the topic with the schema ID. When the sink connector consumes the message, it’ll read the schema with the ID and deserializes it. The schema registry uses a PostgreSQL database as an artifact store where multiple versions of schemas are kept. In this post, we’ll build it locally using Docker Compose.

Local Services

Earlier we discussed a local development environment for a Change Data Capture (CDC) solution using the Confluent platform – see this post for details. While it provides a quick and easy way of developing Kafka locally, it doesn’t seem to match MSK well. For example, its docker image already includes the Avro converter and schema registry client libraries. Because of that, while Kafka connectors with Avro serialization work in the platform without modification, they’ll fail on MSK Connect if they are deployed with the same configuration. Therefore it’ll be better to use docker images from other open source projects instead of the Confluent platform while we can still use docker-compose to build a local development environment. The associating docker-compose file can be found in the GitHub repository for this post.

Kafka Cluster

We can create a single node Kafka cluster as a docker-compose service with Zookeeper, which is used to store metadata about the cluster. The Bitnami images are used to build the services as shown below.

 

# docker-compose.yml
services:
  zookeeper:
    image: bitnami/zookeeper:3.7.0
    container_name: zookeeper
    ports:
      – “2181:2181”
    environment:
      – ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.8.1
    container_name: kafka
    ports:
      – “9092:9092”
    environment:
      – KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      – ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      – zookeeper
 

Kafka Connect

The same Bitnami image can be used to create a Kafka connect service. It is set to run in the distributed mode so that multiple connectors can be deployed together. Three Kafka connector sources are mapped to the /opt/connectors folder of the container – Debezium, Confluent S3 Sink and Voluble. Note that this folder is added to the plugin path of the connector configuration file (connect-distributed.properties) so that they can be discovered when it is requested to create those. Also a script is created to download the connector sources (and related libraries) to the connect/local/src folder – it’ll be illustrated below. Finally my AWS account is configured by AWS SSO so that temporary AWS credentials are passed as environment variables – it is necessary for the S3 sink connector.

 

# docker-compose.yml
services:
 
  kafka-connect:
    image: bitnami/kafka:2.8.1
    container_name: connect
    command: >
      /opt/bitnami/kafka/bin/connect-distributed.sh
      /opt/bitnami/kafka/config/connect-distributed.properties
    ports:
      – “8083:8083”
    environment:
      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
    volumes:
      – “./connect/local/src/debezium-connector-postgres:/opt/connectors/debezium-postgres”
      – “./connect/local/src/confluent-s3/lib:/opt/connectors/confluent-s3”
      – “./connect/local/src/voluble/lib:/opt/connectors/voluble”
      – “./connect/local/config/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties”
    depends_on:
      – zookeeper
      – kafka
 

When the script (download-connectors.sh) runs, it downloads connector sources from Maven Central and Confluent Hub and decompresses. And the Kafka Connect Avro Converter is packaged together with connector sources, which is necessary for Avro serialization of messages and schema registry integration. Note that, if we run our own Kafka connect, we’d add it to one of the folders of the connect service and update its plugin path to enable class discovery. However we don’t have such control on MSK Connect and we should add the converter source to the individual connectors.

# connect/local/download-connectors.sh

#!/usr/bin/env bash

echo “Add avro converter? (Y/N)”
read WITH_AVRO

SCRIPT_DIR=$(cd $(dirname “$0”); pwd)”

SRC_PATH=${SCRIPT_DIR}/src
rm -rf ${SCRIPT_DIR}/src && mkdir -p ${SRC_PATH}

## Debezium Source Connector
echo “downloading debezium postgres connector…”
DOWNLOAD_URL=https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.8.1.Final/debezium-connector-postgres-1.8.1.Final-plugin.tar.gz

curl -S -L ${DOWNLOAD_URL} | tar -C ${SRC_PATH} –warning=no-unknown-keyword -xzf –

## Confluent S3 Sink Connector
echo “downloading confluent s3 connector…”
DOWNLOAD_URL=https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.0.5/confluentinc-kafka-connect-s3-10.0.5.zip

curl ${DOWNLOAD_URL} -o ${SRC_PATH}/confluent.zip \
  && unzip -qq ${SRC_PATH}/confluent.zip -d ${SRC_PATH} \
  && rm ${SRC_PATH}/confluent.zip \
  && mv ${SRC_PATH}/$(ls ${SRC_PATH} | grep confluentinc-kafka-connect-s3) ${SRC_PATH}/confluent-s3

## Voluble Source Connector
echo “downloading voluble connector…”
DOWNLOAD_URL=https://d1i4a15mxbxib1.cloudfront.net/api/plugins/mdrogalis/voluble/versions/0.3.1/mdrogalis-voluble-0.3.1.zip

curl ${DOWNLOAD_URL} -o ${SRC_PATH}/voluble.zip \
  && unzip -qq ${SRC_PATH}/voluble.zip -d ${SRC_PATH} \
  && rm ${SRC_PATH}/voluble.zip \
  && mv ${SRC_PATH}/$(ls ${SRC_PATH} | grep mdrogalis-voluble) ${SRC_PATH}/voluble

if [ ${WITH_AVRO} == “Y” ]; then
  echo “downloading kafka connect avro converter…”
  DOWNLOAD_URL=https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/6.0.3/confluentinc-kafka-connect-avro-converter-6.0.3.zip

  curl ${DOWNLOAD_URL} -o ${SRC_PATH}/avro.zip \
    && unzip -qq ${SRC_PATH}/avro.zip -d ${SRC_PATH} \
    && rm ${SRC_PATH}/avro.zip \
    && mv ${SRC_PATH}/$(ls ${SRC_PATH} | grep confluentinc-kafka-connect-avro-converter) ${SRC_PATH}/avro

  echo “copying to connectors…”
  cp -r ${SRC_PATH}/avro/lib/* ${SRC_PATH}/debezium-connector-postgres

  cp -r ${SRC_PATH}/avro/lib/* ${SRC_PATH}/confluent-s3/lib
  cp -r ${SRC_PATH}/avro/lib/* ${SRC_PATH}/voluble/lib
fi

Schema Registry

The Confluent schema registry uses Kafka as the storage backend and it is not sure whether it supports IAM authentication. Therefore the Apicurio Registry is used instead as it supports a SQL database as storage as well. It provides multiple APIs and one of them is compatible with the Confluent schema registry (/apis/ccompat/v6). We’ll use this API as we plan to use the Confluent version of Avro converter and schema registry client. The PostgreSQL database will be used for the artifact store of the registry as well as the source database of the Debezium connector. For Debezium, the pgoutput plugin is used so that logical replication is enabled (wal_level=logical). The NorthWind database is used as the source database – see this post for details. The registry service expects database connection details from environment variables and it is set to wait until the source database is up and running.

 

# docker-compose.yml
services:
 
  postgres:
    image: postgres:13
    container_name: postgres
    command: [“postgres”, “-c”, “wal_level=logical”]
    ports:
      – 5432:5432
    volumes:
      – ./connect/data/sql:/docker-entrypoint-initdb.d
    environment:
      – POSTGRES_DB=main
      – POSTGRES_USER=master
      – POSTGRES_PASSWORD=password
  registry:
    image: apicurio/apicurio-registry-sql:2.2.0.Final
    container_name: registry
    command: bash -c ‘while !</dev/tcp/postgres/5432; do sleep 1; done; /usr/local/s2i/run’
    ports:
      – “9090:8080”
    environment:
      REGISTRY_DATASOURCE_URL: “jdbc:postgresql://postgres/main?currentSchema=registry”
      REGISTRY_DATASOURCE_USERNAME: master
      REGISTRY_DATASOURCE_PASSWORD: password
    depends_on:
      – zookeeper
      – kafka
      – postgres
 

Once started, we see the Confluent schema registry compatible API from the API list and we’ll use it for creating Kafka connectors.

Kafka UI

Having a good user interface can make development much easier and pleasant. The Kafka UI is an open source application where we can monitor and manage Kafka brokers, related objects and resources. It supports MSK IAM authentication as well and it is a good choice for developing applications on MSK. It allows adding details of one or more Kafka clusters as environment variables. We only have a single Kafka cluster and details of the cluster and related resources are added as shown below.

 

# docker-compose.yml
services:
 
  kafka-ui:
    image: provectuslabs/kafka-ui:0.3.3
    container_name: kafka-ui
    ports:
      – “8080:8080”
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://registry:8080/apis/ccompat/v6
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
    depends_on:
      – zookeeper
      – kafka

The UI is quite intuitive and we can monitor (and manage) the Kafka cluster and related objects/resources comprehensively.

Create Connectors

The Debezium source connector can be created as shown below. The configuration details can be found in one of the earlier posts. Here the main difference is the key and value converters are set to the Confluent Avro converter in order for the key and value to be serialized into the Avro format. Note the Confluent compatible API is added to the registry URL.

 

curl -i -X POST -H “Accept:application/json” -H  “Content-Type:application/json” \
  http://localhost:8083/connectors/ \
  -d ‘{
        “name”: “orders-source”,
        “config”: {
          “connector.class“: “io.debezium.connector.postgresql.PostgresConnector“,
          “tasks.max“: “1”,
          “plugin.name“: “pgoutput”,
          “publication.name“: “cdc_publication”,
          “slot.name“: “orders”,
          “database.hostname“: “postgres”,
          “database.port“: “5432”,
          “database.user“: “master”,
          “database.password“: “password”,
          “database.dbname“: “main”,
          “database.server.name“: “ord”,
          “schema.include“: “ods”,
          “table.include.list“: “ods.cdc_events“,
          “key.converter“: “io.confluent.connect.avro.AvroConverter“,
          “key.converter.schema.registry.url“: “http://registry:8080/apis/ccompat/v6”,
          “value.converter“: “io.confluent.connect.avro.AvroConverter“,
          “value.converter.schema.registry.url“: “http://registry:8080/apis/ccompat/v6”,
          “transforms”: “unwrap”,
          “transforms.unwrap.type“: “io.debezium.transforms.ExtractNewRecordState“,
          “transforms.unwrap.drop.tombstones“: “false”,
          “transforms.unwrap.delete.handling.mode“: “rewrite”,
          “transforms.unwrap.add.fields“: “op,db,table,schema,lsn,source.ts_ms
        }
  }’

The Confluent S3 sink connector is used instead of the Lenses S3 sink connector because the Lenses connector doesn’t work with the Kafka Connect Avro Converter. Here the key and value converters are updated to the Confluent Avro converter with the corresponding schema registry URL.  

curl -i -X POST -H “Accept:application/json” -H  “Content-Type:application/json” \
  http://localhost:8083/connectors/ \
  -d ‘{
        “name”: “orders-sink”,
        “config”: {
          “connector.class“: “io.confluent.connect.s3.S3SinkConnector”,
          “storage.class“: “io.confluent.connect.s3.storage.S3Storage”,
          “format.class“: “io.confluent.connect.s3.format.avro.AvroFormat“,
          “tasks.max“: “1”,
          “topics”:”ord.ods.cdc_events“,
          “s3.bucket.name“: “analytics-data-590312749310-ap-southeast-2”,
          “s3.region“: “ap-southeast-2”,
          “flush.size“: “100”,
          “rotate.schedule.interval.ms“: “60000”,
          “timezone”: “Australia/Sydney”,
          “partitioner.class“: “io.confluent.connect.storage.partitioner.DefaultPartitioner“,
          “key.converter“: “io.confluent.connect.avro.AvroConverter“,
          “key.converter.schema.registry.url“: “http://registry:8080/apis/ccompat/v6”,
          “value.converter“: “io.confluent.connect.avro.AvroConverter“,
          “value.converter.schema.registry.url“: “http://registry:8080/apis/ccompat/v6”,
          “errors.log.enable“: “true”
        }
  }’

Once the source connector is created, we can check that the key and value schemas are created as shown below. Note we can check the details of the schemas by clicking the relevant items.

As we added the schema registry URL as an environment variable, we see the records (key and value) are properly deserialized within the UI.

Schema Evolution

The schema registry keeps multiple versions of schemas and we can check it by adding a column to the table and updating records.

 

–// add a column with a default value
ALTER TABLE ods.cdc_events
    ADD COLUMN employee_id int DEFAULT -1;

–// update employee ID
UPDATE ods.cdc_events
    SET employee_id = (employee ->> ’employee_id’)::INT
WHERE customer_id = ‘VINET’

Once the above queries are executed, we see a new version is added to the topic’s value schema and it includes the new field.

Summary

In this post, we discussed an improved architecture of a Change Data Capture (CDC) solution with a schema registry. A local development environment is set up using Docker Compose. The Debezium and Confluent S3 connectors are deployed with the Confluent Avro converter and the Apicurio registry is used as the schema registry service. A quick example is shown to illustrate how schema evolution can be managed by the schema registry. In the next post, it’ll be deployed to AWS mainly using MSK Connect, Aurora PostgreSQL and ECS.

Enjoyed this blog?

Share it with your network!

Move faster with confidence