Kafka, Flink and DynamoDB for Real Time Fraud Detection – Part 1 Local Development

Apache Flink is an open-source, unified stream-processing and batch-processing framework. Its core is a distributed streaming data-flow engine that you can use to run real-time stream processing on high-throughput data sources. Currently, it is widely used to build applications for fraud/anomaly detection, rule-based alerting, business process monitoring, and continuous ETL to name a few. On AWS, we can deploy a Flink application via Amazon Kinesis Data Analytics (KDA), Amazon EMR and Amazon EKS. Among those, KDA is the easiest option as it provides the underlying infrastructure for your Apache Flink applications.

 

There are a number of AWS workshops and blog posts where we can learn Flink development on AWS and one of those is AWS Kafka and DynamoDB for real time fraud detection. While this workshop targets a Flink application on KDA, it would have been easier if it illustrated local development before moving into deployment via KDA. In this series of posts, we will re-implement the fraud detection application of the workshop for those who are new to Flink and KDA. Specifically the app will be developed locally using Docker in part 1, and it will be deployed via KDA in part 2.

 

Architecture

There are two Python applications that send transaction and flagged account records into the corresponding topics – the transaction app sends records indefinitely in a loop. Both the topics are consumed by a Flink application, and it filters the transactions from the flagged accounts followed by sending them into an output topic of flagged transactions. Finally, the flagged transaction records are sent into a DynamoDB table by the Camel DynamoDB sink connector in order to serve real-time requests from an API.

Infrastructure

The Kafka cluster, Kafka connect and management app (kpow) are created using Docker while the python apps including the Flink app run in a virtual environment. The source can be found in the GitHub repository of this post.

Preparation

As discussed later, the Flink application needs the Kafka connector artifact (flink-sql-connector-kafka-1.15.2.jar) in order to connect a Kafka cluster. Also, the source of the Camel DynamoDB sink connector should be available in the Kafka connect service. They can be downloaded by executing the following script.

 

# build.sh

#!/usr/bin/env bash

PKG_ALL=${PKG_ALL:-no}

 

SCRIPT_DIR=$(cd $(dirname $0); pwd)

 

#### Steps to package the flink app

SRC_PATH=$SCRIPT_DIR/package

rm -rf $SRC_PATH && mkdir -p $SRC_PATH/lib

 

## Download flink sql connector kafka

echo “download flink sql connector kafka…”

VERSION=1.15.2

FILE_NAME=flink-sql-connector-kafka-$VERSION

FLINK_SRC_DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$VERSION/flink-sql-connector-kafka-$VERSION.jar

curl -L -o $SRC_PATH/lib/$FILE_NAME.jar ${FLINK_SRC_DOWNLOAD_URL}

 

## Install pip packages

echo “install and zip pip packages…”

pip3 install -r requirements.txt –target $SRC_PATH/site_packages

 

if [ $PKG_ALL == “yes” ]; then

  ## Package pyflink app

  echo “package pyflink app”

  zip -r kda-package.zip processor.py package/lib package/site_packages

fi

 

#### Steps to create the sink connector

CONN_PATH=$SCRIPT_DIR/connectors

rm -rf $CONN_PATH && mkdir $CONN_PATH

 

## Download camel dynamodb sink connector

echo “download camel dynamodb sink connector…”

CONNECTOR_SRC_DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ddb-sink-kafka-connector/3.20.3/camel-aws-ddb-sink-kafka-connector-3.20.3-package.tar.gz

 

## decompress and zip contents to create custom plugin of msk connect later

curl -o $CONN_PATH/camel-aws-ddb-sink-kafka-connector.tar.gz $CONNECTOR_SRC_DOWNLOAD_URL \

  && tar -xvzf $CONN_PATH/camel-aws-ddb-sink-kafka-connector.tar.gz -C $CONN_PATH \

  && cd $CONN_PATH/camel-aws-ddb-sink-kafka-connector \

  && zip -r camel-aws-ddb-sink-kafka-connector.zip . \

  && mv camel-aws-ddb-sink-kafka-connector.zip $CONN_PATH \

  && rm $CONN_PATH/camel-aws-ddb-sink-kafka-connector.tar.gz

Once downloaded, they can be found in the corresponding folders as shown below. Although the Flink app doesn’t need the kafka-python package, it is included in the site_packages folder in order to check if –pyFiles option works in KDA – it’ll be checked in part 2.

Kafka and Related Services

A Kafka cluster with a single broker and zookeeper node is used in this post. The broker has two listeners and the port 9092 and 29092 are used for internal and external communication respectively. The default number of topic partitions is set to 2. Details about Kafka cluster setup can be found in this post.

 

A Kafka Connect service is configured to run in a distributed mode. The connect properties file and the source of the Camel DynamoDB sink connector are volume-mapped. Also AWS credentials are added to environment variables as it needs permission to put items into a DynamoDB table. Details about Kafka connect setup can be found in this post.

 

Finally, the Kpow CE is used for ease of monitoring Kafka topics and related resources. The bootstrap server address and connect REST URL are added as environment variables. See this post for details about Kafka management apps.

 

# docker-compose.yml

version: “3.5”

 

services:

  zookeeper:

    image: bitnami/zookeeper:3.5

    container_name: zookeeper

    ports:

      “2181”

    networks:

      – kafkanet

    environment:

      – ALLOW_ANONYMOUS_LOGIN=yes

    volumes:

      – zookeeper_data:/bitnami/zookeeper

  kafka-0:

    image: bitnami/kafka:2.8.1

    container_name: kafka-0

    expose:

      9092

    ports:

      “29092:29092”

    networks:

      – kafkanet

    environment:

      – ALLOW_PLAINTEXT_LISTENER=yes

      – KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181

      – KAFKA_CFG_BROKER_ID=0

      – KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

      – KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29092

      – KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,EXTERNAL://localhost:29092

      – KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL

      – KAFKA_CFG_NUM_PARTITIONS=2

    volumes:

      – kafka_0_data:/bitnami/kafka

    depends_on:

      – zookeeper

  kafka-connect:

    image: bitnami/kafka:2.8.1

    container_name: connect

    command: >

      /opt/bitnami/kafka/bin/connect-distributed.sh

      /opt/bitnami/kafka/config/connect-distributed.properties      

    ports:

      “8083:8083”

    networks:

      – kafkanet

    environment:

      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID

      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY

      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN

    volumes:

      “./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties”

      “./connectors/camel-aws-ddb-sink-kafka-connector:/opt/connectors/camel-aws-ddb-sink-kafka-connector”

    depends_on:

      – zookeeper

      – kafka-0

  kpow:

    image: factorhouse/kpow-ce:91.2.1

    container_name: kpow

    ports:

      “3000:3000”

    networks:

      – kafkanet

    environment:

      BOOTSTRAP: kafka-0:9092

      CONNECT_REST_URL: http://kafka-connect:8083

    depends_on:

      – zookeeper

      – kafka-0

      – kafka-connect

 

networks:

  kafkanet:

    name: kafka-network

 

volumes:

  zookeeper_data:

    driver: local

    name: zookeeper_data

  kafka_0_data:

    driver: local

    name: kafka_0_data

DynamoDB Table

The destination table is named flagged-transactions, and it has the primary key where transaction_id and transaction_date are the hash and range key respectively. It also has a global secondary index (GSI) where account_id and transaction_date constitute the primary key. The purpose of the GSI is for ease of querying transactions by account ID. The table can be created using the AWS CLI as shown below.

 

aws dynamodb create-table \

  –cli-input-json file://configs/ddb.json

 

// configs/ddb.json

{

  “TableName”: “flagged-transactions”,

  “KeySchema”: [

    { “AttributeName”: “transaction_id”, “KeyType”: “HASH” },

    { “AttributeName”: “transaction_date”, “KeyType”: “RANGE” }

  ],

  “AttributeDefinitions”: [

    { “AttributeName”: “transaction_id”, “AttributeType”: “S” },

    { “AttributeName”: “account_id”, “AttributeType”: “N” },

    { “AttributeName”: “transaction_date”, “AttributeType”: “S” }

  ],

  “ProvisionedThroughput”: {

    “ReadCapacityUnits”: 2,

    “WriteCapacityUnits”: 2

  },

  “GlobalSecondaryIndexes”: [

    {

      “IndexName”: “account”,

      “KeySchema”: [

        { “AttributeName”: “account_id”, “KeyType”: “HASH” },

        { “AttributeName”: “transaction_date”, “KeyType”: “RANGE” }

      ],

      “Projection”: { “ProjectionType”: “ALL” },

      “ProvisionedThroughput”: {

        “ReadCapacityUnits”: 2,

        “WriteCapacityUnits”: 2

      }

    }

  ]

}

Virtual Environment

As mentioned earlier, all Python apps run in a virtual environment, and we need the following pip packages. We use the version 1.15.2 of the apache-flink package because it is the latest supported version by KDA. We also need the kafka-python package for source data generation. The pip packages can be installed by pip install -r requirements-dev.txt.

 

# requirements.txt

kafka-python==2.0.2


# requirements-dev.txt

-r requirements.txt

apache-flink==1.15.2

black==19.10b0

pytest

pytest-cov

Application

Source Data

A single Python script is created for the transaction and flagged account apps. They generate and send source data into Kafka topics. Each of the classes for the flagged account and transaction has the asdict, auto and create methods. The create method generates a list of records where each element is instantiated by the auto method. Those records are sent into the relevant Kafka topic after being converted into a dictionary by the asdict method.

 

A Kafka producer is created as an attribute of the Producer class. The source records are sent into the relevant topic by the send method. Note that both the key and value of the messages are serialized as json.

 

Whether to send flagged account or transaction records is determined by an environment variable called DATA_TYPE. We can run those apps as shown below.

* flagged account – DATE_TYPE=account python producer.py

* transaction – DATE_TYPE=transaction python producer.py

 

# producer.py

import os

import datetime

import time

import json

import typing

import random

import logging

import dataclasses


from kafka import KafkaProducer


logging.basicConfig(

    level=logging.INFO,

    format=%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s,

    datefmt=“%Y-%m-%d %H:%M:%S”,

)



@dataclasses.dataclass

class FlagAccount:

    account_id: int

    flag_date: str


    def asdict(self):

        return dataclasses.asdict(self)


    @classmethod

    def auto(cls, account_id: int):

        flag_date = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S.%f)

        return cls(account_id, flag_date)


    @staticmethod

    def create():

        return [FlagAccount.auto(account_id) for account_id in range(1000000001, 1000000010, 2)]



@dataclasses.dataclass

class Transaction:

    account_id: int

    customer_id: str

    merchant_type: str

    transaction_id: str

    transaction_type: str

    transaction_amount: float

    transaction_date: str


    def asdict(self):

        return dataclasses.asdict(self)


    @classmethod

    def auto(cls):

        account_id = random.randint(1000000001, 1000000010)

        customer_id = f“C{str(account_id)[::-1]}”

        merchant_type = random.choice([“Online”, “In Store”])

        transaction_id = “”.join(random.choice(“0123456789ABCDEF”) for i in range(16))

        transaction_type = random.choice(

            [

                “Grocery_Store”,

                “Gas_Station”,

                “Shopping_Mall”,

                “City_Services”,

                “HealthCare_Service”,

                “Food and Beverage”,

                “Others”,

            ]

        )

        transaction_amount = round(random.randint(100, 10000) * random.random(), 2)

        transaction_date = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S.%f)

        return cls(

            account_id,

            customer_id,

            merchant_type,

            transaction_id,

            transaction_type,

            transaction_amount,

            transaction_date,

        )


    @staticmethod

    def create(num: int):

        return [Transaction.auto() for _ in range(num)]



class Producer:

    def __init__(self, bootstrap_servers: list, account_topic: str, transaction_topic: str):

        self.bootstrap_servers = bootstrap_servers

        self.account_topic = account_topic

        self.transaction_topic = transaction_topic

        self.producer = self.create()


    def create(self):

        return KafkaProducer(

            bootstrap_servers=self.bootstrap_servers,

            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode(“utf-8”),

            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode(“utf-8”),

            api_version=(2, 8, 1),

        )


    def send(self, records: typing.Union[typing.List[FlagAccount], typing.List[Transaction]]):

        for record in records:

            try:

                key = {“account_id”: record.account_id}

                topic = self.account_topic

                if hasattr(record, “transaction_id”):

                    key[“transaction_id”] = record.transaction_id

                    topic = self.transaction_topic

                self.producer.send(topic=topic, key=key, value=record.asdict())

            except Exception as e:

                raise RuntimeError(“fails to send a message”) from e

        self.producer.flush()


    def serialize(self, obj):

        if isinstance(obj, datetime.datetime):

            return obj.isoformat()

        if isinstance(obj, datetime.date):

            return str(obj)

        return obj



if __name__ == “__main__”:

    producer = Producer(

        bootstrap_servers=os.getenv(“BOOTSTRAP_SERVERS”, “localhost:29092”).split(“,”),

        account_topic=os.getenv(“CUSTOMER_TOPIC_NAME”, “flagged-accounts”),

        transaction_topic=os.getenv(“TRANSACTION_TOPIC_NAME”, “transactions”),

    )

    if os.getenv(“DATE_TYPE”, “account”) == “account”:

        producer.send(FlagAccount.create())

        producer.producer.close()

    else:

        max_run = int(os.getenv(“MAX_RUN”, “-1”))

        logging.info(f“max run – {max_run}”)

        current_run = 0

        while True:

            current_run += 1

            logging.info(f“current run – {current_run}”)

            if current_run max_run == 0:

                logging.info(f“reached max run, finish”)

                producer.producer.close()

                break

            producer.send(Transaction.create(5))

            secs = random.randint(2, 5)

            logging.info(f“messages sent… wait {secs} seconds”)

            time.sleep(secs)


Once we start the apps, we can check the topics for the source data are created and messages are ingested in Kpow.

Output Data

The Flink application is built using the Table API. We have two Kafka source topics and one output topic. Simply put, we can query the records of the topics as tables of unbounded real-time streams with the Table API. In order to read/write records from/to a Kafka topic, we need to specify the Kafka connector artifact that we downloaded earlier as the pipeline jar. Note we only need to configure the connector jar when we develop the app locally as the jar file will be specified by the –jarfile option for KDA. We also need the application properties file (application_properties.json) in order to be comparable with KDA. The file contains the Flink runtime options in KDA as well as application specific properties. All the properties should be specified when deploying via KDA and, for local development, we keep them as a json file and only the application specific properties are used.

 

The tables for the source and output topics can be created using SQL with options that are related to the Kafka connector. Key options cover the connector name (connector), topic name (topic), bootstrap server address (properties.bootstrap.servers) and format (format). See the connector document for more details about the connector configuration. When it comes to inserting flagged transaction records into the output topic, we use a function written in SQL as well – insert_into_stmt.

 

In the main method, we create all the source and sink tables after mapping relevant application properties. Then the output records are inserted into the output Kafka topic. Note that the output records are printed in the terminal additionally when the app is running locally for ease of checking them. We can run the app as following – RUNTIME_ENV=LOCAL python processor.py.

 

# processor.py

import os

import json

import logging

 

import kafka  # check if –pyFiles works

from pyflink.table import EnvironmentSettings, TableEnvironment

 

logging.basicConfig(

    level=logging.INFO,

    format=%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s,

    datefmt=“%Y-%m-%d %H:%M:%S”,

)

 

RUNTIME_ENV = os.environ.get(“RUNTIME_ENV”, “KDA”# KDA, LOCAL

logging.info(f“runtime environment – {RUNTIME_ENV}…”)

 

env_settings = EnvironmentSettings.in_streaming_mode()

table_env = TableEnvironment.create(env_settings)

 

APPLICATION_PROPERTIES_FILE_PATH = (

    “/etc/flink/application_properties.json”  # on kda or docker-compose

    if RUNTIME_ENV != “LOCAL”

    else “application_properties.json”

)

 

if RUNTIME_ENV != “KDA”:

    # on non-KDA, multiple jar files can be passed after being delimited by a semicolon

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))

    PIPELINE_JAR = “flink-sql-connector-kafka-1.15.2.jar”

    table_env.get_config().set(

        “pipeline.jars”, f“file://{os.path.join(CURRENT_DIR, ‘package’, ‘lib’, PIPELINE_JAR)}”

    )

logging.info(f“app properties file path – {APPLICATION_PROPERTIES_FILE_PATH}”)

 

def get_application_properties():

    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):

        with open(APPLICATION_PROPERTIES_FILE_PATH, “r”) as file:

            contents = file.read()

            properties = json.loads(contents)

            return properties

    else:

        raise RuntimeError(f“A file at ‘{APPLICATION_PROPERTIES_FILE_PATH}’ was not found”)

 

def property_map(props: dict, property_group_id: str):

    for prop in props:

        if prop[“PropertyGroupId”] == property_group_id:

            return prop[“PropertyMap”]

 

def create_flagged_account_source_table(

    table_name: str, topic_name: str, bootstrap_servers: str, startup_mode: str

):

    stmt = f“””

    CREATE TABLE {table_name} (

        account_id BIGINT,

        flag_date TIMESTAMP(3)

    )

    WITH (

        ‘connector’ = ‘kafka’,

        ‘topic’ = ‘{topic_name}’,

        ‘properties.bootstrap.servers’ = ‘{bootstrap_servers}’,

        ‘properties.group.id’ = ‘flagged-account-source-group’,

        ‘format’ = ‘json’,

        ‘scan.startup.mode’ = ‘{startup_mode}’

    )

    “””

    logging.info(“flagged account source table statement…”)

    logging.info(stmt)

    return stmt

 

def create_transaction_source_table(

    table_name: str, topic_name: str, bootstrap_servers: str, startup_mode: str

):

    stmt = f“””

    CREATE TABLE {table_name} (

        account_id BIGINT,

        customer_id VARCHAR(15),

        merchant_type VARCHAR(8),

        transaction_id VARCHAR(16),

        transaction_type VARCHAR(20),

        transaction_amount DECIMAL(10,2),

        transaction_date TIMESTAMP(3)

    )

    WITH (

        ‘connector’ = ‘kafka’,

        ‘topic’ = ‘{topic_name}’,

        ‘properties.bootstrap.servers’ = ‘{bootstrap_servers}’,

        ‘properties.group.id’ = ‘transaction-source-group’,

        ‘format’ = ‘json’,

        ‘scan.startup.mode’ = ‘{startup_mode}’

    )

    “””

    logging.info(“transaction source table statement…”)

    logging.info(stmt)

    return stmt

 

def create_flagged_transaction_sink_table(table_name: str, topic_name: str, bootstrap_servers: str):

    stmt = f“””

    CREATE TABLE {table_name} (

        account_id BIGINT,

        customer_id VARCHAR(15),

        merchant_type VARCHAR(8),

        transaction_id VARCHAR(16),

        transaction_type VARCHAR(20),

        transaction_amount DECIMAL(10,2),

        transaction_date TIMESTAMP(3)

    )

    WITH (

        ‘connector’ = ‘kafka’,

        ‘topic’ = ‘{topic_name}’,

        ‘properties.bootstrap.servers’ = ‘{bootstrap_servers}’,        

        ‘format’ = ‘json’,

        ‘key.format’ = ‘json’,

        ‘key.fields’ = ‘account_id;transaction_id’,

        ‘properties.allow.auto.create.topics’ = ‘true’

    )

    “””

    logging.info(“transaction sink table statement…”)

    logging.info(stmt)

    return stmt

 

def create_print_table(table_name: str):

    return f“””

    CREATE TABLE {table_name} (

        account_id BIGINT,

        customer_id VARCHAR(15),

        merchant_type VARCHAR(8),

        transaction_id VARCHAR(16),

        transaction_type VARCHAR(20),

        transaction_amount DECIMAL(10,2),

        transaction_date TIMESTAMP(3)

    )

    WITH (

        ‘connector’ = ‘print’

    )

    “””

 

def insert_into_stmt(insert_from_tbl: str, compare_with_tbl: str, insert_into_tbl: str):

    return f“””

    INSERT INTO {insert_into_tbl}

        SELECT l.*

        FROM {insert_from_tbl} AS l

        JOIN {compare_with_tbl} AS r

            ON l.account_id = r.account_id 

            AND l.transaction_date > r.flag_date

    “””

 

def main():

    ## map consumer/producer properties

    props = get_application_properties()

    # consumer for flagged account

    consumer_0_property_group_key = “consumer.config.0”

    consumer_0_properties = property_map(props, consumer_0_property_group_key)

    consumer_0_table_name = consumer_0_properties[“table.name”]

    consumer_0_topic_name = consumer_0_properties[“topic.name”]

    consumer_0_bootstrap_servers = consumer_0_properties[“bootstrap.servers”]

    consumer_0_startup_mode = consumer_0_properties[“startup.mode”]

    # consumer for transactions

    consumer_1_property_group_key = “consumer.config.1”

    consumer_1_properties = property_map(props, consumer_1_property_group_key)

    consumer_1_table_name = consumer_1_properties[“table.name”]

    consumer_1_topic_name = consumer_1_properties[“topic.name”]

    consumer_1_bootstrap_servers = consumer_1_properties[“bootstrap.servers”]

    consumer_1_startup_mode = consumer_1_properties[“startup.mode”]

    # producer

    producer_0_property_group_key = “producer.config.0”

    producer_0_properties = property_map(props, producer_0_property_group_key)

    producer_0_table_name = producer_0_properties[“table.name”]

    producer_0_topic_name = producer_0_properties[“topic.name”]

    producer_0_bootstrap_servers = producer_0_properties[“bootstrap.servers”]

    # print

    print_table_name = “sink_print”

    ## create the source table for flagged accounts

    table_env.execute_sql(

        create_flagged_account_source_table(

            consumer_0_table_name,

            consumer_0_topic_name,

            consumer_0_bootstrap_servers,

            consumer_0_startup_mode,

        )

    )

    table_env.from_path(consumer_0_table_name).print_schema()

    ## create the source table for transactions

    table_env.execute_sql(

        create_transaction_source_table(

            consumer_1_table_name,

            consumer_1_topic_name,

            consumer_1_bootstrap_servers,

            consumer_1_startup_mode,

        )

    )

    table_env.from_path(consumer_1_table_name).print_schema()

    ## create sink table for flagged accounts

    table_env.execute_sql(

        create_flagged_transaction_sink_table(

            producer_0_table_name, producer_0_topic_name, producer_0_bootstrap_servers

        )

    )

    table_env.from_path(producer_0_table_name).print_schema()

    table_env.execute_sql(create_print_table(“sink_print”))

    ## insert into sink tables

    if RUNTIME_ENV == “LOCAL”:

        statement_set = table_env.create_statement_set()

        statement_set.add_insert_sql(

            insert_into_stmt(consumer_1_table_name, consumer_0_table_name, producer_0_table_name)

        )

        statement_set.add_insert_sql(

            insert_into_stmt(consumer_1_table_name, consumer_0_table_name, print_table_name)

        )

        statement_set.execute().wait()

    else:

        table_result = table_env.execute_sql(

            insert_into_stmt(consumer_1_table_name, consumer_0_table_name, producer_0_table_name)

        )

        logging.info(table_result.get_job_client().get_job_status())

 

if __name__ == “__main__”:

    main()

 

// application_properties.json

[

  {

    “PropertyGroupId”: “kinesis.analytics.flink.run.options”,

    “PropertyMap”: {

      “python”: “processor.py”,

      “jarfile”: “package/lib/flink-sql-connector-kinesis-1.15.2.jar”,

      “pyFiles”: “package/site_packages/”

    }

  },

  {

    “PropertyGroupId”: “consumer.config.0”,

    “PropertyMap”: {

      “table.name”: “flagged_accounts”,

      “topic.name”: “flagged-accounts”,

      “bootstrap.servers”: “localhost:29092”,

      “startup.mode”: “earliest-offset”

    }

  },

  {

    “PropertyGroupId”: “consumer.config.1”,

    “PropertyMap”: {

      “table.name”: “transactions”,

      “topic.name”: “transactions”,

      “bootstrap.servers”: “localhost:29092”,

      “startup.mode”: “earliest-offset”

    }

  },

  {

    “PropertyGroupId”: “producer.config.0”,

    “PropertyMap”: {

      “table.name”: “flagged_transactions”,

      “topic.name”: “flagged-transactions”,

      “bootstrap.servers”: “localhost:29092”

    }

  }

]

The terminal on the right-hand side shows the output records of the Flink app while the left-hand side records logs of the transaction app. We see that the account IDs end with all odd numbers, which matches transactions from flagged accounts.

We can also see details of all the topics in Kpow as shown below.

Sink Output Data

Kafka Connect provides a REST API that manages connectors, and we can create a connector programmatically using it. The REST endpoint requires a JSON payload that includes connector configurations.

 

$ curl -i -X POST -H “Accept:application/json” -H  “Content-Type:application/json” \

  http://localhost:8083/connectors/ -d @configs/sink.json

The connector is configured to write messages from the flagged-transactions topic into the DynamoDB table created earlier. It requires to specify the table name, AWS region, operation, write capacity and whether to use the default credential provider – see the documentation for details. Note that, if you don’t use the default credential provider, you have to specify the access key id and secret access key. Note further that, although the current LTS version is v3.18.2, the default credential provider option didn’t work for me, and I was recommended to use v3.20.3 instead. Finally, the camel.sink.unmarshal option is to convert data from the internal java.util.HashMap type into the required java.io.InputStream type. Without this configuration, the connector fails with org.apache.camel.NoTypeConversionAvailableException error.

 

// configs/sink.json

{

  “name”: “transactions-sink”,

  “config”: {

    “connector.class”: “org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector”,

    “tasks.max”: “2”,

    “key.converter”: “org.apache.kafka.connect.json.JsonConverter”,

    “key.converter.schemas.enable”: false,

    “value.converter”: “org.apache.kafka.connect.json.JsonConverter”,

    “value.converter.schemas.enable”: false,

    “topics”: “flagged-transactions”,


    “camel.kamelet.aws-ddb-sink.table”: “flagged-transactions”,

    “camel.kamelet.aws-ddb-sink.region”: “ap-southeast-2”,

    “camel.kamelet.aws-ddb-sink.operation”: “PutItem”,

    “camel.kamelet.aws-ddb-sink.writeCapacity”: 1,

    “camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider”: true,

    “camel.sink.unmarshal”: “jackson”

  }

}


Below shows the sink connector details on Kpow.

We can check the ingested records on the DynamoDB table items view. Below shows a list of scanned records.

Summary

 

Apache Flink is widely used for building real-time stream processing applications. On AWS, Kinesis Data Analytics (KDA) is the easiest option to develop a Flink app as it provides the underlying infrastructure. Re-implementing a solution from an AWS workshop, this series of posts discuss how to develop and deploy a fraud detection app using Kafka, Flink and DynamoDB. In this post, we covered local development using Docker, and deployment via KDA will be discussed in part 2.

Enjoyed this blog?

Share it with your network!

Move faster with confidence