Data Lake Demo using Change Data Capture (CDC) on AWS – Part 1 Database and Local Development

Change data capture (CDC) is a proven data integration pattern that has a wide range of applications. Among those, data replication to data lakes is a good use case in data engineering. Coupled with best-in-breed data lake formats such as Apache Hudi, we can build an efficient data replication solution. This is the first post of the data lake demo series. Over time, we’ll build a data lake that uses CDC. As a starting point, we’ll discuss the source database and CDC streaming infrastructure in the local environment.

Architecture

As described in a Red Hat IT topics article, change data capture (CDC) is a proven data integration pattern to track when and what changes occur in data then alert other systems and services that must respond to those changes. Change data capture helps maintain consistency and functionality across all systems that rely on data.

 

The primary use of CDC is to enable applications to respond almost immediately whenever data in databases change. Specifically its use cases cover microservices integration, data replication with up-to-date data, building time-sensitive analytics dashboards, auditing and compliance, cache invalidation, full-text search and so on. There are a number of approaches for CDC – polling, dual writes and log-based CDC. Among those, log-based CDC has advantages to other approaches.

 

Both Amazon DMS and Debezium implement log-based CDC. While the former is a managed service, the latter can be deployed to a Kafka cluster as a (source) connector. It uses Apache Kafka as a messaging service to deliver database change notifications to the applicable systems and applications. Note that Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems by connectors. In AWS, we can use Amazon MSK and MSK Connect for building a Debezium based CDC solution.

 

Data replication to data lakes using CDC can be much more effective if data is stored to a format that supports atomic transactions and consistent updates. Popular choices are Apache Hudi, Apache Iceberg and Delta Lake. Among those, Apache Hudi can be a good option as it is well-integrated with AWS services.

 

Below shows the architecture of the data lake solution that we will be building in this series of posts.

  1. Employing the transactional outbox pattern, the source database publishes change event records to the CDC event table. The event records are generated by triggers that listen to insert and update events on source tables.

  2. CDC is implemented in a streaming environment and Amazon MSK is used to build the streaming infrastructure. In order to process the real-time CDC event records, a source and sink connectors are set up in Amazon MSK Connect. The Debezium connector for PostgreSQL is used as the source connector and the Lenses S3 connector is used as the sink connector. The sink connector pushes messages to a S3 bucket.

  3. Hudi DeltaStreamer is run on Amazon EMR. As a spark application, it reads files from the S3 bucket and upserts Hudi records to another S3 bucket. The Hudi table is created in the AWS Glue Data Catalog.

  4. The Hudi table is queried in Amazon Athena while the table is registered in the AWS Glue Data Catalog.

  5. Dashboards are created in Amazon Quicksight where the dataset is created using Amazon Athena.

 

As a starting point, we’ll discuss the source database and streaming infrastructure in the local environment.

Source Database

Data Model

We will use the Northwind database as the source database. It was originally created by Microsoft and used by various tutorials of their database products. It contains the sales data for a fictitious company called Northwind Traders that deals with specialty foods from around the world. As shown in the following entity relationship diagram, it includes a schema for a small business ERP with customers, products, orders, employees and so on. The version that is ported to PostgreSQL is obtained from YugabyteDB sample datasets and the SQL scripts can be found in the project GitHub repository. For local development, a service is created using docker compose and it’ll be illustrated in the next section.

Outbox Table and Event Generation

It is straightforward to capture changes from multiple tables in a database using Kafka connectors where a separate topic is created for each table. Data ingestion to Hudi, however, can be complicated if messages are stored in multiple Kafka topics. Note that we will use the DeltaStreamer utility and it maps to a single topic. In order to simplify the data ingestion process, we can employ the transactional outbox pattern. Using this pattern, we can create an outbox table (cdc_events) and upsert a record to it when a new transaction is made. In this way, all database changes can be pushed to a single topic, resulting in one DeltaStreamer process to listen to the change events.

 

Below shows the table creation statement of the outbox table. It aims to store all details of an order entry in a row. The columns that have the JSONB data type store attributes of other entities. For example, the order_items column includes ordered product information. As multiple products can be purchased, it keeps an array of product id, unit price, quantity and discount.

 

— ./data/sql/03_cdc_events.sql

CREATE TABLE cdc_events(
    order_id              SMALLINT NOT NULL PRIMARY KEY,
    customer_id       BPCHAR NOT NULL,
    order_date         DATE,
    required_date   DATE,
    shipped_date    DATE,
    order_items       JSONB,
    products             JSONB,
    customer            JSONB,
    employee           JSONB,
    shipper               JSONB,
    shipment            JSONB,
    updated_at      TIMESTAMPTZ
);

In order to create event records, triggers are added to the orders and order_details tables. They execute the fn_insert_order_event function after an INSERT or UPDATE action occurs to the respective tables. Note the DELETE action is not considered in the event generation process for simplicity. The trigger function basically collects details of an order entry and attempts to insert a new record to the outbox table. If a record with the same order id exists, it updates the record instead. 

— ./data/sql/03_cdc_events.sql

CREATE TRIGGER orders_triggered
  AFTER INSERT OR UPDATE
  ON orders
  FOR EACH ROW
  EXECUTE PROCEDURE fn_insert_order_event();

CREATE TRIGGER order_details_triggered
  AFTER INSERT OR UPDATE
  ON order_details
  FOR EACH ROW
  EXECUTE PROCEDURE fn_insert_order_event();

 

CREATE OR REPLACE FUNCTION fn_insert_order_event()
    RETURNS TRIGGER
    LANGUAGE PLPGSQL
    AS
$$
BEGIN
    IF (TG_OP IN (‘INSERT’, ‘UPDATE’)) THEN
        WITH product_details AS (
            SELECT p.product_id,
                  row_to_json(p.*)::jsonb AS product_details
            FROM (
                SELECT *
                FROM products p
                JOIN suppliers s ON p.supplier_id = s.supplier_id
                JOIN categories c ON p.category_id = c.category_id
            ) AS p
        ), order_items AS (
            SELECT od.order_id,
                  jsonb_agg(row_to_json(od.*)::jsonb – ‘order_id’) AS order_items,
                  jsonb_agg(pd.product_details) AS products
            FROM order_details od
            JOIN product_details pd ON od.product_id = pd.product_id
            WHERE od.order_id = NEW.order_id
            GROUP BY od.order_id
        ), emps AS (
            SELECT employee_id,
                  row_to_json(e.*)::jsonb AS details
            FROM employees e
        ), emp_territories AS (
            SELECT et.employee_id,
                  jsonb_agg(
                    row_to_json(t.*)
                  ) AS territories
            FROM employee_territories et
            JOIN (
                SELECT t.territory_id, t.territory_description, t.region_id, r.region_description
                FROM territories t
                JOIN region r ON t.region_id = r.region_id
            ) AS t ON et.territory_id = t.territory_id
            GROUP BY et.employee_id
        ), emp_details AS (
            SELECT e.employee_id,
                  e.details || jsonb_build_object(‘territories’, et.territories) AS details
            FROM emps AS e
            JOIN emp_territories AS et ON e.employee_id = et.employee_id
        )
            INSERT INTO cdc_events
                SELECT o.order_id,
                      o.customer_id,
                      o.order_date,
                      o.required_date,
                      o.shipped_date,
                      oi.order_items,
                      oi.products,
                      row_to_json(c.*)::jsonb AS customer,
                      ed.details::jsonb AS employee,
                      row_to_json(s.*)::jsonb AS shipper,
                      jsonb_build_object(
                        ‘freight’, o.freight,
                        ‘ship_name’, o.ship_name,
                        ‘ship_address’, o.ship_address,
                        ‘ship_city’, o.ship_city,
                        ‘ship_region’, o.ship_region,
                        ‘ship_postal_code’, o.ship_postal_code,
                        ‘ship_country’, o.ship_country
                      ) AS shipment,
                      now()
                FROM orders o
                LEFT JOIN order_items oi ON o.order_id = oi.order_id
                JOIN customers c ON o.customer_id = c.customer_id
                JOIN emp_details ed ON o.employee_id = ed.employee_id
                JOIN shippers s ON o.ship_via = s.shipper_id
                WHERE o.order_id = NEW.order_id
            ON CONFLICT (order_id)
            DO UPDATE
                SET order_id         = excluded.order_id,
                    customer_id      = excluded.customer_id,
                    order_date        = excluded.order_date,
                    required_date  = excluded.required_date,
                    shipped_date   = excluded.shipped_date,
                    order_items       = excluded.order_items,
                    products             = excluded.products,
                    customer            = excluded.customer,
                    shipper               = excluded.shipper,
                    shipment            = excluded.shipment,
                    updated_at      = excluded.updated_at;
    END IF;
    RETURN NULL;
END 

$$;

Create Initial Event Records

In order to create event records for existing order entries, a stored procedure is created – usp_init_order_events. It is quite similar to the trigger function and can be checked in the project repository. The procedure is called at database initialization and a total of 829 event records are created by that.

 

— ./data/sql/03_cdc_events.sql
CALL usp_init_order_events();

Below shows a simplified event record, converted into JSON. For the order with id 10248, 3 products are ordered by a customer whose id is VINET. 

{
  “order_id”: 10248,
  “customer_id”: “VINET”,
  “order_date”: “1996-07-04”,
  “required_date”: “1996-08-01”,
  “shipped_date”: “1996-07-16”,
  “order_items”: [
    { “discount”: 0, “quantity”: 12, “product_id”: 11, “unit_price”: 14 },
    { “discount”: 0, “quantity”: 10, “product_id”: 42, “unit_price”: 9.8 },
    { “discount”: 0, “quantity”: 5, “product_id”: 72, “unit_price”: 34.8 }
  ],
  “products”: [
    { “product_id”: 11, “product_name”: “Queso Cabrales” },
    { “product_id”: 42, “product_name”: “Singaporean Hokkien Fried Mee” },
    { “product_id”: 72, “product_name”: “Mozzarella di Giovanni” }
  ],
  “customer”: {
    “customer_id”: “VINET”,
    “company_name”: “Vins et alcools Chevalier”
  },
  “employee”: {
    “title”: “Sales Manager”,
    “last_name”: “Buchanan”,
    “employee_id”: 5
  },
  “shipper”: {
    “company_name”: “Federal Shipping”
  },
  “shipment”: {
    “freight”: 32.38,
    “ship_name”: “Vins et alcools Chevalier”
  },
  “updated_at”: “2021-11-27T20:30:13.644579+11:00”
}

Create Publication

As discussed further in the next section, we’ll be using the native pgoutput logical replication stream support. Debezium, Kafka source connector, automatically creates a publication that contains all tables if it doesn’t exist. It can cause trouble to update a record to a table that doesn’t have the primary key or replica identity. So as to handle such an issue, a publication that contains only the outbox table is created. This publication will be used when configuring the source connector.

 

— ./data/sql/03_cdc_events.sql
CREATE PUBLICATION cdc_publication
    FOR TABLE cdc_events;

CDC Development

Docker Compose

The Confluent platform can be handy for local development although we’ll be deploying the solution using Amazon MSK and MSK Connect. The quick start guide provides a docker compose file that includes its various components in separate services. It also contains the control center, a graphical user interface, which helps check brokers, topics, messages and connectors easily. 

 

Additionally we need a PostgreSQL instance for the Northwind database and a service named postgres is added. The database is initialised by a set of SQL scripts. They are executed by volume-mapping to the initialisation folder – the scripts can be found in the project repository. Also the Kafka Connect instance, running in the connect service, needs an update to include the source and sink connectors. It’ll be illustrated further below. 

 

Below shows a cut-down version of the docker compose file that we use for local development. The complete file can be found in the project repository

 

# ./docker-compose.yml

version: “2”
services:
  postgres:
    image: debezium/postgres:13
    …
    ports:
      – 5432:5432
    volumes:
      – ./data/sql:/docker-entrypoint-initdb.d
    environment:
      – POSTGRES_DB=devdb
      – POSTGRES_USER=devuser
      – POSTGRES_PASSWORD=password
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    …
    ports:
      – “2181:2181”
    environment:
      …
  broker:
    image: confluentinc/cp-server:6.2.1
    …
    depends_on:
      – zookeeper
    ports:
      – “9092:9092”
      – “9101:9101”
    environment:
      …
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      …
  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.1
    …
    depends_on:
      …
    ports:
      – “8081:8081”
    environment:
      …
  connect:
    build: .connector/local/cp-server-connect-datagen
    …
    depends_on:
      …
    ports:
      – “8083:8083”
    volumes:
      – ${HOME}/.aws:/home/appuser/.aws
    environment:
      CONNECT_BOOTSTRAP_SERVERS: “broker:29092”
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      …
      # include /usr/local/share/kafka/plugins for community connectors

      CONNECT_PLUGIN_PATH: “/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins”
      …
  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.1
    …
    depends_on:
      …
    ports:
      – “9021:9021”
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: “broker:29092”
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: “connect:8083”
      …
      PORT: 9021
  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.2.1
    …
    depends_on:
      …
    ports:
      – 8082:8082
    environment:
      …

Install Connectors

We use the Debezium connector for PostgreSQL as the source connector and Lenses S3 Connector as the sink connector. The source connector is installed via the confluent hub client while the sink connector is added as a community connector. Note that the environment variable of CONNECT_PLUGIN_PATH is updated to include the kafka plugin folder (/usr/local/share/kafka/plugins).

 

# .connector/local/cp-server-connect-datagen/Dockerfile

FROM cnfldemos/cp-server-connect-datagen:0.5.06.2.1

# install debezium postgresql connector from confluent hub
RUN confluent-hub install –no-prompt debezium/debezium-connector-postgresql:1.7.1

# install lenses S3 connector as a community connector – https://docs.confluent.io/home/connect/community.html
USER root
RUN mkdir -p /usr/local/share/kafka/plugins/kafka-connect-aws-s3 && \
  curl -SsL https://github.com/lensesio/stream-reactor/releases/download/3.0.0/kafka-connect-aws-s3-3.0.0-2.5.0-all.tar.gz \
    | tar -C /usr/local/share/kafka/plugins/kafka-connect-aws-s3 –warning=no-unknown-keyword -xzf –

# update connect plugin path
ENV CONNECT_PLUGIN_PATH=$CONNECT_PLUGIN_PATH,/usr/local/share/kafka/plugins
USER appuser

Start Services

After starting the docker compose services, we can check a local Kafka cluster in the control center via http://localhost:9021. We can go to the cluster overview page by clicking the cluster card item.

We can check an overview of the cluster in the page. For example, it shows the cluster has 1 broker and there is no running connector. On the left side, there are menus for individual components. Among those, Topics and Connect will be our main interest in this post.

Create Connectors

Source Connector

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL versions 9.6, 10, 11, 12 and 13 are supported. The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.”

 

The connector has a number of connector properties including name, connector class, database connection details, key/value converter and so on – the full list of properties can be found in this page. The properties that need explanation are listed below.

 

  • plugin.name – Using the logical decoding feature, an output plug-in enables clients to consume changes to the transaction log in a user-friendly manner. Debezium supports decoderbufs, wal2json and pgoutput plug-ins. Both wal2json and pgoutput are available in Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL. decoderbufs requires a separate installation and it is excluded from the option. Among the 2 supported plug-ins, pgoutput is selected because it is the standard logical decoding output plug-in in PostgreSQL 10+ and has better performance for large transactions.

  • publication.name – With the pgoutput plug-in, the Debezium connector creates a publication (if not exists) and sets publication.autocreate.mode to all_tables. It can cause an issue to update a record to a table that doesn’t have the primary key or replica identity. We can set the value to filtered where the connector adjusts the applicable tables by other property values. Alternatively we can create a publication on our own and add the name to publication.name property. I find creating a publication explicitly is easier to maintain. Note a publication alone is not sufficient to handle the issue. All affected tables by the publication should have the primary key or replica identity. In our example, the orders and order_details tables should meet the condition. In short, creating an explicit publication can prevent the event generation process from interrupting other processes by limiting the scope of CDC event generation.

  • key.converter/value.converter – Although Avro serialization is recommended, JSON is a format that can be generated without schema registry and can be read by DeltaStreamer.

  • transforms* – A Debezium event data has a complex structure that provides a wealth of information. It can be quite difficult to process such a structure using DeltaStreamer. Debezium’s event flattening single message transformation (SMT) is configured to flatten the output payload.

 

Note once the connector is deployed, the CDC event records will be published to demo.datalake.cdc_events topic.

 

// ./connector/local/source-debezium.json

{
  “name”: “orders-source”,
  “config”: {
    “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
    “tasks.max”: “1”,
    “plugin.name”: “pgoutput”,
    “publication.name”: “cdc_publication”,
    “database.hostname”: “postgres”,
    “database.port”: “5432”,
    “database.user”: “devuser”,
    “database.password”: “password”,
    “database.dbname”: “devdb”,
    “database.server.name”: “demo”,
    “schema.include”: “datalake”,
    “table.include.list”: “datalake.cdc_events”,
    “key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
    “value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
    “transforms”: “unwrap”,
    “transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
    “transforms.unwrap.drop.tombstones”: “false”,
    “transforms.unwrap.delete.handling.mode”: “rewrite”,
    “transforms.unwrap.add.fields”: “op,db,table,schema,lsn,source.ts_ms”
  }
}

The source connector is created using the API and its status can be checked as shown below. 

## create debezium source connector
curl -i -X POST -H “Accept:application/json” -H  “Content-Type:application/json” \
  http://localhost:8083/connectors/ -d @connector/local/source-debezium.json

## check connector status
curl http://localhost:8083/connectors/orders-source/status

#{
#  “name”: “orders-source”,
#  “connector”: { “state”: “RUNNING”, “worker_id”: “connect:8083” },
#  “tasks”: [{ “id”: 0, “state”: “RUNNING”, “worker_id”: “connect:8083” }],
#  “type”: “source”
#}

Sink Connector

Lenses S3 Connector is a Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets. It extends the standard connect config adding a parameter for a SQL command (Lenses Kafka Connect Query Language or “KCQL”). This defines how to map data from the source (in this case Kafka) to the target (S3). Importantly, it also includes how data should be partitioned into S3, the bucket names and the serialization format (support includes JSON, Avro, Parquet, Text, CSV and binary).

 

I find the Lenses S3 connector is more straightforward to configure than the Confluent S3 sink connector for its SQL-like syntax. The KCQL configuration indicates that object files are set to be

 

  • moved from a Kafka topic (demo.datalake.cdc_events) to an S3 bucket (data-lake-demo-cevo) with object prefix of cdc-events-local,

  • partitioned by customer_id and order_id eg) customer_id=<customer-id>/order_id=<order-id>,

  • stored as the JSON format and,

  • flushed every 60 seconds or when there are 50 records.

 

// ./connector/local/sink-s3.json

{
  “name”: “orders-sink”,
  “config”: {
    “connector.class”: “io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector”,
    “tasks.max”: “1”,
    “connect.s3.kcql”: “INSERT INTO data-lake-demo-cevo:cdc-events-local SELECT * FROM demo.datalake.cdc_events PARTITIONBY customer_id,order_id STOREAS `json` WITH_FLUSH_INTERVAL = 60 WITH_FLUSH_COUNT = 50”,
    “aws.region”: “ap-southeast-2”,
    “aws.custom.endpoint”: “https://s3.ap-southeast-2.amazonaws.com/”,
    “topics”: “demo.datalake.cdc_events”,
    “key.converter.schemas.enable”: “false”,
    “schema.enable”: “false”,
    “errors.log.enable”: “true”,
    “key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
    “value.converter”: “org.apache.kafka.connect.json.JsonConverter”
  }
}

The sink connector is created using the API and its status can be checked as shown below.

## create s3 sink connector
curl -i -X POST -H “Accept:application/json” -H  “Content-Type:application/json” \
  http://localhost:8083/connectors/ -d @connector/local/sink-s3.json

## check connector status
curl http://localhost:8083/connectors/orders-sink/status

#{
#  “name”: “orders-sink”,
#  “connector”: { “state”: “RUNNING”, “worker_id”: “connect:8083” },
#  “tasks”: [{ “id”: 0, “state”: “RUNNING”, “worker_id”: “connect:8083” }],
#  “type”: “sink”
#}

 

We can also check the details of the connectors from the control center.

In the Topics menu, we are able to see that the demo.datalake.cdc_events topic is created by the Debezium connector.

We can check messages of a topic by clicking the topic name. After adding an offset value (eg 0) to the input element, we are able to see messages of the topic. We can check message fields on the left hand side or download a message in the JSON or CSV format.

Check Event Output Files

We can check the output files that are processed by the sink connector in S3. Below shows an example record where customer_id is RATTC and order_id is 11077. As configured, the objects are prefixed by cdc-events-local and further partitioned by customer_id and order_id. The naming convention of output files is <topic-name>(partition_offset).ext.

Update Event Example

The above order record has a NULL shpped_date value. When we update it using the following SQL statement, we should be able to see a new output file with the updated value.

 

BEGIN TRANSACTION;
    UPDATE orders
    SET shipped_date = ‘1998-06-15’::date
    WHERE order_id = 11077;
COMMIT TRANSACTION;
END;

In S3, we are able to see that a new output file is stored. In the new output file, the shipped_date value is updated to 10392. Note that the Debezium connector converts the DATE type to the INT32 type, which represents the number of days since the epoch.

Insert Event Example

When a new order is created, it’ll insert a record to the orders table as well as one or more order items to the order_details table. Therefore we expect multiple event records will be created when a new order is created. We can check it by inserting an order and related order details items.

 

BEGIN TRANSACTION;
    INSERT INTO orders VALUES (11075, ‘RICSU’, 8, ‘1998-05-06’, ‘1998-06-03’, NULL, 2, 6.19000006, ‘Richter Supermarkt’, ‘Starenweg 5’, ‘Genève’, NULL, ‘1204’, ‘Switzerland’);
    INSERT INTO order_details VALUES (11075, 2, 19, 10, 0.150000006);
    INSERT INTO order_details VALUES (11075, 46, 12, 30, 0.150000006);
    INSERT INTO order_details VALUES (11075, 76, 18, 2, 0.150000006);
COMMIT TRANSACTION;
END;

We can see the output file includes 4 JSON objects where the first object has NULL order_items and products value. We can also see that those values are expanded gradually in subsequent event records.

new-message

Conclusion

We discussed a data lake solution where data ingestion is performed using change data capture (CDC) and the output files are upserted to a Hudi table. Being registered to Glue Data Catalog, it can be used for ad-hoc queries and report/dashboard creation. The Northwind database is used as the source database and, following the transactional outbox pattern, order-related changes are upserted to an outbox table by triggers. The data ingestion is developed using Kafka connectors in the local Confluent platform where the Debezium for PostgreSQL is used as the source connector and the Lenses S3 sink connector is used as the sink connector. We confirmed the order creation and update events are captured as expected and it is ready for production deployment. In the next post, we’ll build the data ingestion part of the solution with Amazon MSK and MSK Connect.

Share on facebook
Facebook
Share on twitter
Twitter
Share on linkedin
LinkedIn