Simplify Streaming Ingestion on AWS – Part 1 MSK and Redshift

Apache Kafka is a popular distributed event store and stream processing platform. Previously loading data from Kafka into Redshift and Athena usually required Kafka connectors (e.g. Amazon Redshift Sink Connector and Amazon S3 Sink Connector). Recently these AWS services provide features to ingest data from Kafka directly, which facilitates a simpler architecture that achieves low-latency and high-speed ingestion of streaming data. In part 1 of the simplify streaming ingestion on AWS series, we discuss how to develop an end-to-end streaming ingestion solution using EventBridge, Lambda, MSK and Redshift Serverless on AWS. 

 

Architecture

Fake online order data is generated by multiple Lambda functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Kafka producer Lambda function. In this way we are able to generate test data using multiple Lambda functions according to the desired volume of messages. Once the messages are sent to a Kafka topic, they can be consumed by a materialized view in an external schema that sources data from the MKS cluster. The infrastructure is built by Terraform and the AWS SAM CLI is used to develop the producer Lambda function locally before deploying to AWS.

Infrastructure

A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (vpc.tf). Also a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (vpn.tf). It is particularly useful to monitor and manage the MSK cluster and Kafka topic as well as developing the Kafka producer Lambda function locally. The details about how to configure the VPN server can be found in an earlier post. The source can be found in the GitHub repository of this post.

MSK

A MSK cluster with 3 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in the private subnets. IAM authentication is used for the client authentication method. Note this method is the only secured authentication method supported by Redshift because the external schema supports either the no authentication or IAM authentication method only.

 

# integration-redshift/infra/variable.tf
locals {
  ...
  msk = {
    version          = “3.3.1”
    instance_size    = “kafka.m5.large”
    ebs_volume_size  = 20
    log_retention_ms = 604800000 # 7 days
  }
  ...
}

# integration-redshift/infra/msk.tf
resource “aws_msk_cluster” “msk_data_cluster” {
  cluster_name           = ${local.name}-msk-cluster”
  kafka_version          = local.msk.version
  number_of_broker_nodes = length(module.vpc.private_subnets)
  configuration_info {
    arn      = aws_msk_configuration.msk_config.arn
    revision = aws_msk_configuration.msk_config.latest_revision
  }

  broker_node_group_info {
    instance_type   = local.msk.instance_size
    client_subnets  = module.vpc.private_subnets
    security_groups = [aws_security_group.msk.id]
    storage_info {
      ebs_storage_info {
        volume_size = local.msk.ebs_volume_size
      }
    }
  }

  client_authentication {
    sasl {
      iam = true
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.msk_cluster_lg.name
      }
      s3 {
        enabled = true
        bucket  = aws_s3_bucket.default_bucket.id
       prefix  = “logs/msk/cluster-“
      }
    }
  }

  tags = local.tags

  depends_on = [aws_msk_configuration.msk_config]
}

resource “aws_msk_configuration” “msk_config” {
  name = ${local.name}-msk-configuration”

  kafka_versions = [local.msk.version]

  server_properties = <<PROPERTIES
    auto.create.topics.enable = true
    delete.topic.enable = true
    log.retention.ms = ${local.msk.log_retention_ms}
  PROPERTIES
}

Inbound Rules for MSK Cluster

We need to allow access to the MSK cluster from multiple AWS resources. Specifically the VPN server needs access for monitoring/managing the cluster and topic as well as developing the producer Lambda function locally. Also the Lambda function and Redshift cluster need access for producing and consuming messages respectively. Only the port 9098 is added to the inbound/outbound rules because client access is enabled by the IAM authentication method exclusively. Note that the security group and outbound rule of the Lambda function are created here while the Lambda function is created in a different Terraform stack. This is for ease of adding it to the inbound rule of the MSK’s security group and later we will discuss how to make use of it with the Lambda function. 

 

# integration-redshift/infra/msk.tf
resource “aws_security_group” “msk” {
  name   = ${local.name}-msk-sg”
  vpc_id = module.vpc.vpc_id

  lifecycle {
    create_before_destroy = true
  }

  tags = local.tags
}

resource “aws_security_group_rule” “msk_vpn_inbound” {
  count                    = local.vpn.to_create ? 1 : 0
 type                     = “ingress”
  description              = “VPN access”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.vpn[0].id
}

resource “aws_security_group_rule” “msk_lambda_inbound” {
 type                     = “ingress”
  description              = “lambda access”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.kafka_producer_lambda.id
}

resource “aws_security_group_rule” “msk_redshift_inbound” {
 type                     = “ingress”
  description              = “redshift access”
  security_group_id        = aws_security_group.msk.id
  protocol                 = “tcp”
  from_port                = 9098
  to_port                  = 9098
  source_security_group_id = aws_security_group.redshift_serverless.id
}

...

resource “aws_security_group” “kafka_producer_lambda” {
  name   = ${local.name}-lambda-msk-access”
  vpc_id = module.vpc.vpc_id

  lifecycle {
    create_before_destroy = true
  }

  tags = local.tags
}

resource “aws_security_group_rule” “kafka_producer_lambda_msk_egress” {
 type              = “egress”
  description       = “lambda msk access”
  security_group_id = aws_security_group.kafka_producer_lambda.id
  protocol          = “tcp”
  from_port         = 9098
  to_port           = 9098
  cidr_blocks       = [“0.0.0.0/0”]
}

# integration-redshift/infra/redshift.tf
resource “aws_security_group” “redshift_serverless” {
  name   = ${local.name}-redshift-serverless”
  vpc_id = module.vpc.vpc_id

  lifecycle {
    create_before_destroy = true
  }

  tags = local.tags
}


resource “aws_security_group_rule” “redshift_msk_egress” {
 type              = “egress”
  description       = “lambda msk access”
  security_group_id = aws_security_group.redshift_serverless.id
  protocol          = “tcp”
  from_port         = 9098
  to_port           = 9098
  cidr_blocks       = [“0.0.0.0/0”]
}

Redshift Serverless

A namespace and workgroup are created to deploy a Redshift serverless cluster. As explained in the Redshift user guide, a namespace is a collection of database objects and users and a workgroup is a collection of compute resources.

 

# integration-redshift/infra/redshift.tf
resource “aws_redshiftserverless_namespace” “namespace” {
  namespace_name = ${local.name}-namespace”

  admin_username       = local.redshift.admin_username
  admin_user_password  = random_password.redshift_admin_pw.result
  db_name              = local.redshift.db_name
  default_iam_role_arn = aws_iam_role.redshift_serverless_role.arn
  iam_roles            = [aws_iam_role.redshift_serverless_role.arn]

  tags = local.tags
}

resource “aws_redshiftserverless_workgroup” “workgroup” {
  namespace_name = aws_redshiftserverless_namespace.namespace.id
  workgroup_name = ${local.name}-workgroup”

  base_capacity      = local.redshift.base_capacity
  subnet_ids         = module.vpc.private_subnets
  security_group_ids = [aws_security_group.redshift_serverless.id]

  tags = local.tags
}

IAM Permission for MSK Access

As illustrated in the AWS documentation, we need an IAM policy that provides permission for communication with the Amazon MSK cluster. The applicable policy is added to the default IAM role of the cluster.

 

# integration-redshift/infra/redshift.tf

resource “aws_iam_role” “redshift_serverless_role” {
  name = ${local.name}-redshift-serverless-role”

  assume_role_policy = data.aws_iam_policy_document.redshift_serverless_assume_role_policy.json
  managed_policy_arns = [
    “arn:aws:iam::aws:policy/AmazonS3FullAccess”,
    “arn:aws:iam::aws:policy/AWSGlueConsoleFullAccess”,
    “arn:aws:iam::aws:policy/AmazonRedshiftFullAccess”,
    “arn:aws:iam::aws:policy/AmazonSageMakerFullAccess”,
    aws_iam_policy.msk_redshift_permission.arn
  ]
}

data “aws_iam_policy_document” “redshift_serverless_assume_role_policy” {
  statement {
    actions = [“sts:AssumeRole”]

    principals {
     type = “Service”
      identifiers = [
        “redshift.amazonaws.com”,
        “sagemaker.amazonaws.com”,
        “events.amazonaws.com”,
        “scheduler.redshift.amazonaws.com”
      ]
    }

    principals {
     type        = “AWS”
      identifiers = [“arn:aws:iam::${data.aws_caller_identity.current.account_id}:root”]
    }
  }
}

resource “aws_iam_policy” “msk_redshift_permission” {
  name = ${local.name}-msk-redshift-permission”

 policy = jsonencode({
    Version = “2012-10-17”
    Statement = [
      {
        Sid = “PermissionOnCluster”
        Action = [
          “kafka-cluster:ReadData”,
          “kafka-cluster:DescribeTopic”,
          “kafka-cluster:Connect”,
        ]
        Effect = “Allow”
       Resource = [
          “arn:aws:kafka:*:${data.aws_caller_identity.current.account_id}:cluster/*/*”,
          “arn:aws:kafka:*:${data.aws_caller_identity.current.account_id}:topic/*/*”
        ]
      },
      {
        Sid = “PermissionOnGroups”
        Action = [
          “kafka:GetBootstrapBrokers”
        ]
        Effect   = “Allow”
       Resource = “*”
      }
    ]
  })
}

Kafka Producer

The resources related to the Kafka producer Lambda function are managed in a separate Terraform stack. This is because it is easier to build the relevant resources iteratively. Note the SAM CLI builds the whole Terraform stack even for a small change of code and it wouldn’t be convenient if the entire resources are managed in the same stack.

Producer Source

The Kafka producer is created to send messages to a topic named orders where fake order data is generated using the Faker package. The Order class generates one or more fake order records by the create method and an order record includes order id, order timestamp, user id and order items. The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. A Kafka message is made up of an order id as the key and an order record as the value. Both the key and value are serialised as JSON. Note that the stable version of the kafka-python package does not support the IAM authentication method. Therefore we need to install the package from a forked repository as discussed in this GitHub issue.

 

# integration-redshift/kafka_producer/src/app.py

import os
import datetime
import string
import json
import time
from kafka import KafkaProducer
from faker import Faker


class Order:
    def __init__(self, fake: Faker = None):
        self.fake = fake or Faker()

    def order(self):
        rand_int = self.fake.random_int(1, 1000)
        user_id = “”.join(
            [string.ascii_lowercase[int(s)] if s.isdigit() else s for s in hex(rand_int)]
        )[::-1]
        return {
            “order_id”: self.fake.uuid4(),
            “ordered_at”: datetime.datetime.utcnow(),
            “user_id”: user_id,
        }

    def items(self):
        return [
            {
                “product_id”: self.fake.random_int(1, 9999),
                “quantity”: self.fake.random_int(1, 10),
            }
            for _ in range(self.fake.random_int(1, 4))
        ]

    def create(self, num: int):
        return [{**self.order(), **{“items”: self.items()}} for _ in range(num)]


class Producer:
    def __init__(self, bootstrap_servers: list, topic: str):
        self.bootstrap_servers = bootstrap_servers
        self.topic = topic
        self.producer = self.create()

    def create(self):
        return KafkaProducer(
            security_protocol=“SASL_SSL”,
            sasl_mechanism=“AWS_MSK_IAM”,
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode(“utf-8”),
            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode(“utf-8”),
        )

    def send(self, orders: list):
        for order in orders:
            self.producer.send(self.topic, key={“order_id”: order[“order_id”]}, value=order)
        self.producer.flush()

    def serialize(self, obj):
        if isinstance(obj, datetime.datetime):
            return obj.isoformat()
        if isinstance(obj, datetime.date):
            return str(obj)
        return obj


def lambda_function(event, context):
    if os.getenv(“BOOTSTRAP_SERVERS”, “”) == “”:
        return
    fake = Faker()
    producer = Producer(
        bootstrap_servers=os.getenv(“BOOTSTRAP_SERVERS”).split(“,”), topic=os.getenv(“TOPIC_NAME”)
    )
    s = datetime.datetime.now()
    ttl_rec = 0
    while True:
        orders = Order(fake).create(100)
        producer.send(orders)
        ttl_rec += len(orders)
        print(f“sent {len(orders)} messages”)
        elapsed_sec = (datetime.datetime.now() – s).seconds
        if elapsed_sec > int(os.getenv(“MAX_RUN_SEC”, “60”)):
            print(f“{ttl_rec} records are sent in {elapsed_sec} seconds …”)
            break
        time.sleep(1)

A sample order record is shown below.

{
  “order_id”: “fc72ccf4-8e98-42b1-9a48-4a6222996be4”,
  “ordered_at”: “2023-02-05T04:30:58.722158”,
  “user_id”: “hfbxa”,
  “items”: [
    {
      “product_id”: 8576,
      “quantity”: 5
    },
    {
      “product_id”: 3101,
      “quantity”: 8
    }
  ]
}

Lambda Function

As the VPC, subnets, Lambda security group and MSK cluster are created in the infra Terraform stack, they need to be obtained from the producer Lambda stack. It can be achieved using the Terraform data sources as shown below. Note that the private subnets can be filtered by a specific tag (Tier: Private), which is added while creating them.

 

# integration-redshift/infra/vpc.tf

module “vpc” {
  source  = “terraform-aws-modules/vpc/aws”
  version = “~> 3.14”

  name = “${local.name}-vpc”
  cidr = local.vpc.cidr

  azs             = local.vpc.azs
  public_subnets  = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k)]
  private_subnets = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k + 3)]

  …

  private_subnet_tags = {
    “Tier” = “Private”
  }

  tags = local.tags
}

# integration-redshift/kafka_producer/variables.tf
data “aws_caller_identity” “current” {}

data “aws_region” “current” {}

data “aws_vpc” “selected” {
  filter {
    name   = “tag:Name”
    values = [“${local.infra_prefix}”]
  }
}

data “aws_subnets” “private” {
  filter {
    name   = “vpc-id”
    values = [data.aws_vpc.selected.id]
  }

  tags = {
    Tier = “Private”
  }
}

data “aws_msk_cluster” “msk_data_cluster” {
  cluster_name = “${local.infra_prefix}-msk-cluster”
}

data “aws_security_group” “kafka_producer_lambda” {
  name = “${local.infra_prefix}-lambda-msk-access”
}

locals {

  …

  infra_prefix = “integration-redshift”

  

}

The AWS Lambda Terraform module is used to create the producer Lambda function. Note that, in order to develop a Lambda function using AWS SAM, we need to create sam metadata resource, which provides the AWS SAM CLI with the information it needs to locate Lambda functions and layers, along with their source code, build dependencies, and build logic from within your Terraform project. It is created by default by the Terraform module, which is convenient. Also we need to give permission to the EventBridge rule to invoke the Lambda function and it is given by the aws_lambda_permission resource.  

# integration-redshift/kafka_producer/variables.tf

locals {
  name        = local.infra_prefix
  region      = data.aws_region.current.name
  environment = “dev”

  infra_prefix = “integration-redshift”

  producer = {
    src_path          = “src”
    function_name     = “kafka_producer”
    handler           = “app.lambda_function”
    concurrency       = 5
    timeout           = 90
    memory_size       = 128
    runtime           = “python3.8”
    schedule_rate     = “rate(1 minute)”
    to_enable_trigger = false
    environment = {
      topic_name  = “orders”
      max_run_sec = 60
    }
  }
  …
}


# integration-redshift/kafka_producer/main.tf
module “kafka_producer_lambda” {
  source = “terraform-aws-modules/lambda/aws”

  function_name          = local.producer.function_name
  handler                = local.producer.handler
  runtime                = local.producer.runtime
  timeout                = local.producer.timeout
  memory_size            = local.producer.memory_size
  source_path            = local.producer.src_path
  vpc_subnet_ids         = data.aws_subnets.private.ids
  vpc_security_group_ids = [data.aws_security_group.kafka_producer_lambda.id]
  attach_network_policy  = true
  attach_policies        = true
  policies               = [aws_iam_policy.msk_lambda_permission.arn]
  number_of_policies     = 1
  environment_variables = {
    BOOTSTRAP_SERVERS = data.aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
    TOPIC_NAME        = local.producer.environment.topic_name
    MAX_RUN_SEC       = local.producer.environment.max_run_sec
  }

  tags = local.tags
}

resource “aws_lambda_function_event_invoke_config” “kafka_producer_lambda” {
  function_name          = module.kafka_producer_lambda.lambda_function_name
  maximum_retry_attempts = 0
}

resource “aws_lambda_permission” “allow_eventbridge” {
  count         = local.producer.to_enable_trigger ? 1 : 0
  statement_id  = “InvokeLambdaFunction”
  action        = “lambda:InvokeFunction”
  function_name = local.producer.function_name
  principal     = “events.amazonaws.com”
  source_arn    = module.eventbridge.eventbridge_rule_arns[“crons”]

  depends_on = [
    module.eventbridge
  ]
}

IAM Permission for MSK

The producer Lambda function needs permission to send messages to the orders topic of the MSK cluster. The following IAM policy is added to the Lambda function according to the AWS documentation

 

# integration-redshift/kafka_producer/main.tf

resource “aws_iam_policy” “msk_lambda_permission” {
  name = ${local.name}-msk-lambda-permission”

 policy = jsonencode({
    Version = “2012-10-17”
    Statement = [
      {
        Sid = “PermissionOnCluster”
        Action = [
          “kafka-cluster:Connect”,
          “kafka-cluster:AlterCluster”,
          “kafka-cluster:DescribeCluster”
        ]
        Effect   = “Allow”
       Resource = “arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.infra_prefix}-msk-cluster/*”
      },
      {
        Sid = “PermissionOnTopics”
        Action = [
          “kafka-cluster:*Topic*”,
          “kafka-cluster:WriteData”,
          “kafka-cluster:ReadData”
        ]
        Effect   = “Allow”
       Resource = “arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.infra_prefix}-msk-cluster/*”
      },
      {
        Sid = “PermissionOnGroups”
        Action = [
          “kafka-cluster:AlterGroup”,
          “kafka-cluster:DescribeGroup”
        ]
        Effect   = “Allow”
       Resource = “arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.infra_prefix}-msk-cluster/*”
      }
    ]
  })
}

EventBridge Rule

The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that 5 targets that point to the Kafka producer Lambda function are created so that it is invoked concurrently every minute. 

 

# integration-redshift/kafka_producer/main.tf

module “eventbridge” {
  source = “terraform-aws-modules/eventbridge/aws”

  create_bus = false

  rules = {
    crons = {
      description         = “Kafka producer lambda schedule”
      schedule_expression = local.producer.schedule_rate
    }
  }

  targets = {
    crons = [for i in range(local.producer.concurrency) : {
      name = “lambda-target-${i}”
      arn  = module.kafka_producer_lambda.lambda_function_arn
    }]
  }

  depends_on = [
    module.kafka_producer_lambda
  ]

  tags = local.tags
}

Deployment

Topic Creation

We first need to create the Kafka topic and it is done using kafka-ui. The UI can be started using docker-compose with the following compose file. Note the VPN connection has to be established in order to access the cluster from the developer machine.

 

# integration-redshift/docker-compose.yml

version: “3”

services:
  kafka-ui:
    image: provectuslabs/kafka-ui:master
    container_name: kafka-ui
    ports:
      – “8080:8080”
    networks:
      – kafkanet
    environment:
      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
      KAFKA_CLUSTERS_0_NAME: msk
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: $BOOTSTRAP_SERVERS
      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_SSL
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: AWS_MSK_IAM
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS: software.amazon.msk.auth.iam.IAMClientCallbackHandler
      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: software.amazon.msk.auth.iam.IAMLoginModule required;

networks:
  kafkanet:
    name: kafka-network

A topic named orders is created that has 3 partitions and replication factors. Also it is set to retain data for 4 weeks.

Once created, it redirects to the overview section of the topic.

Local Testing with SAM

To simplify development, the Eventbridge permission is disabled by setting to_enable_trigger to false. Also it is shortened to loop before it gets stopped by reducing msx_run_sec to 10.

 

# integration-redshift/kafka_producer/variables.tf

locals {
  producer = {
    …
    to_enable_trigger = false
    environment = {
      topic_name  = “orders”
      max_run_sec = 10
    }
  }

  …
}

The Lambda function can be built with the SAM build command while specifying the hook name as terraform and enabling beta features. Once completed, it stores the build artifacts and template in the .aws-sam folder.

$ sam build –hook-name terraform –beta-features
#
#
# Apply complete! …

# Build Succeeded

# Built Artifacts  : .aws-sam/build
# Built Template   : .aws-sam/build/template.yaml

# Commands you can use next
# =========================
# [*] Invoke Function: sam local invoke –hook-name terraform
# [*] Emulate local Lambda functions: sam local start-lambda –hook-name terraform

We can invoke the Lambda function locally using the SAM local invoke command. The Lambda function is invoked in a Docker container and the invocation logs are printed in the terminal as shown below.

$ sam local invoke –hook-name terraform module.kafka_producer_lambda.aws_lambda_function.this[0] –beta-features
# Experimental features are enabled for this session.
# Visit the docs page to learn more about the AWS Beta terms https://aws.amazon.com/service-terms/.

# Skipped prepare hook. Current application is already prepared.
# Invoking app.lambda_function (python3.8)
# Skip pulling image and use local one: public.ecr.aws/sam/emulation-python3.8:rapid-1.70.0-x86_64.

# Mounting …/kafka-pocs/integration-redshift/kafka_producer/.aws-sam/build/ModuleKafkaProducerLambdaAwsLambdaFunctionThis069E06354 as /var/task:ro,delegated inside runtime container
# START RequestId: fbfc11be-362a-48df-b894-232cc88234ee Version: $LATEST
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# sent 100 messages
# 1100 records are sent in 11 seconds …
# END RequestId: fbfc11be-362a-48df-b894-232cc88234ee
# REPORT RequestId: fbfc11be-362a-48df-b894-232cc88234ee  Init Duration: 0.18 ms  Duration: 12494.86 ms   Billed Duration: 12495 ms       Memory Size: 128 MB     Max Memory Used: 128 MB
# null

We can also check the messages using kafka-ui.

External Schema and Materialized View Creation

As we have messages in the orders topic, we can create a materialized view to consume data from it. First we need to create an external schema that sources data from the MSK cluster. We can use the default IAM role as we have already added the necessary IAM permission to it. Also we should specify the IAM authentication method as it is the only allowed method for the MSK cluster. The materialized view selects key Kafka configuration variables and parses the Kafka value as data. The JSON_PARSE function converts the JSON string into the SUPER type, which makes it easy to select individual attributes. Also it is configured to refresh automatically so that it ingests up-to-date data without manual refresh. 

 

CREATE EXTERNAL SCHEMA msk_orders
FROM MSK
IAM_ROLE default
AUTHENTICATION iam
CLUSTER_ARN ‘<MSK Cluster ARN>’;

CREATE MATERIALIZED VIEW orders AUTO REFRESH YES AS
SELECT
  “kafka_partition”,
  “kafka_offset”,
  “kafka_timestamp_type”,
  “kafka_timestamp”,
  “kafka_key”,
  JSON_PARSE(“kafka_value”) as data,
  “kafka_headers”
FROM msk_orders.orders;

We can see the ingested Kafka messages as shown below.

Order Items View Creation

The materialized view keeps the entire order data in a single column and it is not easy to build queries for analytics. As mentioned earlier, we can easily select individual attributes from the data column but the issue is each record has an array of order items that has a variable length. Redshift doesn’t have a function to explode an array into rows but we can achieve it using a recursive CTE. Below shows a view that converts order items array into rows recursively. 

 

CREATE OR REPLACE VIEW order_items AS
  WITH RECURSIVE exploded_items (order_id, ordered_at, user_id, idx, product_id, quantity) AS (
      WITH cte AS (
          SELECT
              data.order_id::character(36) AS order_id,
              data.ordered_at::timestamp AS ordered_at,
              data.user_id::varchar(10) AS user_id,
              data.items AS items,
              get_array_length(data.items) AS num_items
          FROM orders
      )
      SELECT
          order_id,
          ordered_at,
          user_id,
          0 AS idx,
          items[0].product_id::int AS product_id,
          items[0].quantity::int AS quantity
      FROM cte
      UNION ALL
      SELECT
          cte.order_id,
          cte.ordered_at,
          cte.user_id,
          idx + 1,
          cte.items[idx + 1].product_id::int,
          cte.items[idx + 1].quantity::int
      FROM cte
      JOIN exploded_items ON cte.order_id = exploded_items.order_id
      WHERE idx < cte.num_items – 1
  )
  SELECT *
  FROM exploded_items
  ORDER BY order_id;

We can see the exploded order items as shown below.

Kafka Producer Deployment

Now we can deploy the Kafka producer Lambda function and EventBridge scheduler using Terraform as usual after resetting the configuration variables. Once deployed, we can see that the scheduler rule has 5 targets of the same Lambda function.

We can check if the Kafka producer sends messages correctly using kafka-ui. After about 30 minutes, we see about 840,000 messages are created in the orders topic.

Query Order Items

As the materialized view is set to refresh automatically, we don’t have to refresh it manually. Using the order items view, we can query the top 10 popular products as shown below..

Summary

Streaming ingestion from Kafka (MSK) into Redshift and Athena can be much simpler as they now support direct integration. In part 1 of the simplify streaming ingestion on AWS series, we discussed an end-to-end streaming ingestion solution using EventBridge, Lambda, MSK and Redshift. We also used AWS SAM integrated with Terraform for developing a Lambda function locally.

Enjoyed this blog?

Share it with your network!

Move faster with confidence