Apache Flink is an open-source, unified stream-processing and batch-processing framework. Its core is a distributed streaming data-flow engine that you can use to run real-time stream processing on high-throughput data sources. Currently, it is widely used to build applications for fraud/anomaly detection, rule-based alerting, business process monitoring, and continuous ETL to name a few. On AWS, we can deploy a Flink application via Amazon Kinesis Data Analytics (KDA), Amazon EMR and Amazon EKS. Among those, KDA is the easiest option as it provides the underlying infrastructure for your Apache Flink applications.
There are a number of AWS workshops and blog posts where we can learn Flink development on AWS and one of those is AWS Kafka and DynamoDB for real time fraud detection. While this workshop targets a Flink application on KDA, it would have been easier if it illustrated local development before moving into deployment via KDA. In this series of posts, we will re-implement the fraud detection application of the workshop for those who are new to Flink and KDA. Specifically the app will be developed locally using Docker in part 1, and it will be deployed via KDA in part 2.
Part 1 Local Development (this post)
There are two Python applications that send transaction and flagged account records into the corresponding topics – the transaction app sends records indefinitely in a loop. Both the topics are consumed by a Flink application, and it filters the transactions from the flagged accounts followed by sending them into an output topic of flagged transactions. Finally, the flagged transaction records are sent into a DynamoDB table by the Camel DynamoDB sink connector in order to serve real-time requests from an API.
The Kafka cluster, Kafka connect and management app (kpow) are created using Docker while the python apps including the Flink app run in a virtual environment. The source can be found in the GitHub repository of this post.
As discussed later, the Flink application needs the Kafka connector artifact (flink-sql-connector-kafka-1.15.2.jar) in order to connect a Kafka cluster. Also, the source of the Camel DynamoDB sink connector should be available in the Kafka connect service. They can be downloaded by executing the following script.
Once downloaded, they can be found in the corresponding folders as shown below. Although the Flink app doesn’t need the kafka-python package, it is included in the site_packages folder in order to check if –pyFiles option works in KDA – it’ll be checked in part 2.
Kafka and Related Services
A Kafka cluster with a single broker and zookeeper node is used in this post. The broker has two listeners and the port 9092 and 29092 are used for internal and external communication respectively. The default number of topic partitions is set to 2. Details about Kafka cluster setup can be found in this post.
A Kafka Connect service is configured to run in a distributed mode. The connect properties file and the source of the Camel DynamoDB sink connector are volume-mapped. Also AWS credentials are added to environment variables as it needs permission to put items into a DynamoDB table. Details about Kafka connect setup can be found in this post.
Finally, the Kpow CE is used for ease of monitoring Kafka topics and related resources. The bootstrap server address and connect REST URL are added as environment variables. See this post for details about Kafka management apps.
The destination table is named flagged-transactions, and it has the primary key where transaction_id and transaction_date are the hash and range key respectively. It also has a global secondary index (GSI) where account_id and transaction_date constitute the primary key. The purpose of the GSI is for ease of querying transactions by account ID. The table can be created using the AWS CLI as shown below.
As mentioned earlier, all Python apps run in a virtual environment, and we need the following pip packages. We use the version 1.15.2 of the apache-flink package because it is the latest supported version by KDA. We also need the kafka-python package for source data generation. The pip packages can be installed by pip install -r requirements-dev.txt.
A single Python script is created for the transaction and flagged account apps. They generate and send source data into Kafka topics. Each of the classes for the flagged account and transaction has the asdict, auto and create methods. The create method generates a list of records where each element is instantiated by the auto method. Those records are sent into the relevant Kafka topic after being converted into a dictionary by the asdict method.
A Kafka producer is created as an attribute of the Producer class. The source records are sent into the relevant topic by the send method. Note that both the key and value of the messages are serialized as json.
Whether to send flagged account or transaction records is determined by an environment variable called DATA_TYPE. We can run those apps as shown below.
* flagged account – DATE_TYPE=account python producer.py
* transaction – DATE_TYPE=transaction python producer.py
Once we start the apps, we can check the topics for the source data are created and messages are ingested in Kpow.
The Flink application is built using the Table API. We have two Kafka source topics and one output topic. Simply put, we can query the records of the topics as tables of unbounded real-time streams with the Table API. In order to read/write records from/to a Kafka topic, we need to specify the Kafka connector artifact that we downloaded earlier as the pipeline jar. Note we only need to configure the connector jar when we develop the app locally as the jar file will be specified by the –jarfile option for KDA. We also need the application properties file (application_properties.json) in order to be comparable with KDA. The file contains the Flink runtime options in KDA as well as application specific properties. All the properties should be specified when deploying via KDA and, for local development, we keep them as a json file and only the application specific properties are used.
The tables for the source and output topics can be created using SQL with options that are related to the Kafka connector. Key options cover the connector name (connector), topic name (topic), bootstrap server address (properties.bootstrap.servers) and format (format). See the connector document for more details about the connector configuration. When it comes to inserting flagged transaction records into the output topic, we use a function written in SQL as well – insert_into_stmt.
In the main method, we create all the source and sink tables after mapping relevant application properties. Then the output records are inserted into the output Kafka topic. Note that the output records are printed in the terminal additionally when the app is running locally for ease of checking them. We can run the app as following – RUNTIME_ENV=LOCAL python processor.py.
The terminal on the right-hand side shows the output records of the Flink app while the left-hand side records logs of the transaction app. We see that the account IDs end with all odd numbers, which matches transactions from flagged accounts.
We can also see details of all the topics in Kpow as shown below.
Sink Output Data
Kafka Connect provides a REST API that manages connectors, and we can create a connector programmatically using it. The REST endpoint requires a JSON payload that includes connector configurations.
The connector is configured to write messages from the flagged-transactions topic into the DynamoDB table created earlier. It requires to specify the table name, AWS region, operation, write capacity and whether to use the default credential provider – see the documentation for details. Note that, if you don’t use the default credential provider, you have to specify the access key id and secret access key. Note further that, although the current LTS version is v3.18.2, the default credential provider option didn’t work for me, and I was recommended to use v3.20.3 instead. Finally, the camel.sink.unmarshal option is to convert data from the internal java.util.HashMap type into the required java.io.InputStream type. Without this configuration, the connector fails with org.apache.camel.NoTypeConversionAvailableException error.
Below shows the sink connector details on Kpow.
We can check the ingested records on the DynamoDB table items view. Below shows a list of scanned records.
Apache Flink is widely used for building real-time stream processing applications. On AWS, Kinesis Data Analytics (KDA) is the easiest option to develop a Flink app as it provides the underlying infrastructure. Re-implementing a solution from an AWS workshop, this series of posts discuss how to develop and deploy a fraud detection app using Kafka, Flink and DynamoDB. In this post, we covered local development using Docker, and deployment via KDA will be discussed in part 2.