Terraform pipeline to ingest streaming data into Snowflake using Amazon Kinesis

Overview

Streamed data is information that is generated continuously by data sources such as social networks, connected devices, video games, sensors etc. In the age of social media and the Internet of Things (IOT), streaming architectures for data capture and analysis have become extremely popular. 

Snowflake is a data cloud provider. It enables you to build data-intensive applications without operational burden. Snowflake’s snowpipe service is an automated service that utilizes a REST API to asynchronously listen for new data as it arrives in an external staging environment (such as Amazon S3 bucket), and load it into Snowflake as it arrives, whenever it arrives.

Using AWS services and Snowflake’s Snowpipe data ingestion service we can build a data pipeline to collect semi-structured streaming data from a variety of sources into Snowflake data cloud platform. The data engineers would be able to then run analytics on the collected data and extract valuable insights that can be used to drive important business decisions. 

Terraform is an open-source infrastructure as code software tool that provides a consistent CLI workflow to manage hundreds of cloud services. Terraform codifies cloud APIs into declarative configuration files.

Using Terraform we will demonstrate step by step how easy it is to set up a streaming data pipeline to ingest data streams (e.g. Twitter) into Snowflake using AWS Services such as Kinesis Data Firehose and S3 buckets.

Graphical user interface

Description automatically generated with medium confidence

Goal

Demonstrate how terraform can be used to:

  • Create Snowflake resources
    • Warehouse
    • Database
    • Tables
    • External Stages
    • Snowpipe

  • Create AWS resources 
    • IAM Role
    • IAM Policy
    • S3 Bucket
    • S3 Bucket Event Notification
    • Kinesis Firehose Delivery Stream

  • Configure the integration between AWS S3 bucket and Snowflake’s snowpipe.

Assumptions and Prerequisites

Breaking it down piece by piece

We will write the Terraform code and deploy the resources in stages. Follow the steps below to roll out your stack incrementally.

Set your Environment variables in working shell

Ensure that you have read & meet the assumptions and prerequisites.

export aws_region_id="ap-southeast-2"
#Any variable 'x' exported with 'TF_VAR_' (as in TF_VAR_x) prefixed to it, is interpreted by terraform as a terraform variable 'x'
export twi_s3_bucket_name="<your_bucket_name>"
export TF_VAR_twi_s3_bucket_name="${twi_s3_bucket_name}"

export twi_kinesis_firehose_stream_name="terraform-kinesis-firehose-twi-stream"
export TF_VAR_twi_kinesis_firehose_stream_name="${twi_kinesis_firehose_stream_name}"

We shall list at every stage which additional environment variables are needed in order to execute the terraform code and client successfully.

Terraform it incrementally

Now that we have set up the environment with required variables let’s proceed to build our terraform stack. The best way to see how the whole stream delivery to snowflake is put in place, is to build & deploy the stack incrementally. Follow the steps and the explanation as we progress through each step.

What the code structure will look like after we have finally built it?

.
├── scripts
│   └── get-twitter-streams.py
└── tf
    ├── main.tf
    ├── modules
    │   ├── iam-user-for-sf
    │   │   ├── main.tf
    │   │   ├── required_provider.tf -> ../../required_provider.tf
    │   │   └── variables.tf
    │   ├── kinesis-setup
    │   │   ├── firehose_access_policy.tf
    │   │   ├── main.tf
    │   │   ├── required_provider.tf -> ../../required_provider.tf
    │   │   └── variables.tf
    │   ├── s3bucket-for-sf
    │   │   ├── main.tf
    │   │   ├── required_provider.tf -> ../../required_provider.tf
    │   │   └── variables.tf
    │   └── snowflake-setup
    │       ├── main.tf
    │       ├── required_provider.tf -> ../../required_provider.tf
    │       └── variables.tf
    ├── provider.tf
    ├── required_provider.tf
    └── variables.tf

Choose a working folder on your machine

Since we are going to build this incrementally, let’s build the terraform modules/resources one at a time.
First start by creating a new folder. This will be our working folder.

cd ~
rm -rf ~/demo.cevo
mkdir -p ~/demo.cevo/tf
cd ~/demo.cevo

Create aws provider file

Terraform relies on plugins called “providers” to interact with remote systems.
Let’s create the two files – one which declares the required providers and other which declares the providers.

cd ~/demo.cevo/tf
# create 'required_provider.tf' file here. See output of following cat command for the content of the file.
cat required_provider.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "3.20.0"
    }
  }
}
cd ~/demo.cevo/tf
# create a 'provider.tf' file here. See output of following cat command for the content of the file.
cat provider.tf
# Configure the AWS Provider
provider "aws" {
  region = "ap-southeast-2"
}

Test the provider configuration by executing terraform init.

cd ~/demo.cevo/tf
terraform init

If you get ‘Terraform has been successfully initialized!’ message, then we can proceed further. If not check that you have terraform (>v0.13> installed and have defined the provider files properly.

Create Amazon S3 bucket

We need an S3 bucket to save the incoming streams as json files. Here we shall define a Terraform module named as s3bucket-for-sf which will be used to create the S3 bucket. In the module we shall create its variables, resource & output files.

Note: We will also create a soft link from within the module to the required providers file – this is something Terraform needs when working with modules. We will do this for every new module that we define.

mkdir -p ~/demo.cevo/tf/modules/s3bucket-for-sf
cd ~/demo.cevo/tf/modules/s3bucket-for-sf
#creating the soft link to required provider
ln -snf ../../required_provider.tf required_provider.tf
# create a 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "s3_bucket_name" { type = string }
cd ~/demo.cevo/tf/modules/s3bucket-for-sf
# create a 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
resource "aws_s3_bucket" "twi_s3_bucket" {
  bucket = var.s3_bucket_name
  acl = "private"
  tags = {
    Name = var.s3_bucket_name
  }
  force_destroy = true
}

output "arn" {
  value = aws_s3_bucket.twi_s3_bucket.arn
}

output "id" {
  value = aws_s3_bucket.twi_s3_bucket.id
}

output "bucket" {
  value = aws_s3_bucket.twi_s3_bucket.bucket
}

Define your variable file and main file in your working directory which will invoke the s3bucket-for-sf Terraform module.

cd ~/demo.cevo/tf
# create a 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "twi_s3_bucket_name" {
  type = string
  default = "damz-twi-s3-bucket-1"
}
cd ~/demo.cevo/tf
# create a 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
module "s3bucket-for-sf" {
  source = "./modules/s3bucket-for-sf"
  s3_bucket_name = var.twi_s3_bucket_name
}

Roll out the stack by executing Terraform commands.

cd ~/demo.cevo/tf
terraform init
#run terraform plan to see what resources will be created/added
terraform plan
#create the resources
terraform apply --auto-approve

If all goes well, you will see the following message:
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

At this stage your s3 bucket is created.

Create Kinesis Data Firehose Delivery Stream and link it to S3 Bucket

Amazon Kinesis Data Firehose is an automatically scalable, fully managed AWS service that allows you to reliably capture, transform, and deliver streaming data into data lakes, data stores, and analytics services. We will use Kinesis Data Firehose to capture and send data into our S3 bucket that we have created above. Here we will write a terraform module kinesis-setup to create & configure Amazon Kinesis Data Firehose delivery stream.

The module will set up the following:

  • IAM role with attached policy that will allow firehose to
  • Kinesis Data Firehose Delivery Stream – which sends data to S3 & logs to CloudWatch

Let’s define the module kinesis-setup code

mkdir -p ~/demo.cevo/tf/modules/kinesis-setup
cd ~/demo.cevo/tf/modules/kinesis-setup
#creating the soft link to required provider
ln -snf ../../required_provider.tf required_provider.tf
# create a 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "bucket_arn" { type = string }
variable "kinesis_firehose_stream_name" { type = string }
cd ~/demo.cevo/tf/modules/kinesis-setup
# create a 'firehose_access_policy.tf' file here. See output of following cat command for the content of the file.
cat firehose_access_policy.tf
#Define a policy which will allow Kinesis Data Firehose to Assume an IAM Role
data "aws_iam_policy_document" "kinesis_firehose_stream_assume_role" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["firehose.amazonaws.com"]
    }
  }
}

#Define a policy which will allow Kinesis Data Firehose to access your S3 bucket
data "aws_iam_policy_document" "kinesis_firehose_access_bucket_assume_policy" {
  statement {
    effect = "Allow"
    actions = [
      "s3:AbortMultipartUpload",
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:ListBucketMultipartUploads",
      "s3:PutObject",
    ]
    resources = [
      var.bucket_arn,
      "${var.bucket_arn}/*",
    ]
  }
}

#Define a policy which will allow Kinesis Data Firehose to send logs to cloudwatch

#The resource group can be further controlled ...for now allowing every action
data "aws_iam_policy_document" "kinesis_firehose_cw_policy" {
  statement {
    effect = "Allow"
    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents"
    ]
    resources = [
      "arn:aws:logs:*:*:*"
    ]
  }
}
cd ~/demo.cevo/tf/modules/kinesis-setup
# create a 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
#creates a new iam role
resource "aws_iam_role" "firehose_role" {
  name               = "firehose_role"
  assume_role_policy = data.aws_iam_policy_document.kinesis_firehose_stream_assume_role.json
}

#attach s3 bucket access policy
resource "aws_iam_role_policy" "kinesis_firehose_access_bucket_policy" {
  name   = "kinesis_firehose_access_bucket_policy"
  role   = aws_iam_role.firehose_role.name
  policy = data.aws_iam_policy_document.kinesis_firehose_access_bucket_assume_policy.json
}

#attach send logs to cloudwatch policy
resource "aws_iam_role_policy" "kinesis_firehose_send_logs_to_cw_policy" {
  name   = "kinesis_firehose_send_logs_to_cw_policy"
  role   = aws_iam_role.firehose_role.name
  policy = data.aws_iam_policy_document.kinesis_firehose_cw_policy.json
}

resource "aws_kinesis_firehose_delivery_stream" "fh_stream" {
  name        = var.kinesis_firehose_stream_name
  destination = "s3"
  s3_configuration {
    role_arn   = aws_iam_role.firehose_role.arn
    bucket_arn = var.bucket_arn
    buffer_size        = 1
    buffer_interval    = 60
    cloudwatch_logging_options {
      enabled = true
      log_group_name = "/aws/kinesisfirehose/${var.kinesis_firehose_stream_name}"
      log_stream_name = "S3Delivery"
    }
  }
}

Update the variable file and main file in your working directory to include invocation of the kinesis-setup module.

cd ~/demo.cevo/tf
# update the 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "twi_s3_bucket_name" {
  type = string
  default = "damz-twi-s3-bucket-1"
}

variable "twi_kinesis_firehose_stream_name" {
  type = string
  default = "terraform-kinesis-firehose-twi-stream"
}
cd ~/demo.cevo/tf
# update the 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
module "s3bucket-for-sf" {
  source = "./modules/s3bucket-for-sf"
  s3_bucket_name = var.twi_s3_bucket_name
}

module "kinesis-setup" {
  source = "./modules/kinesis-setup"
  bucket_arn = module.s3bucket-for-sf.arn
  kinesis_firehose_stream_name = var.twi_kinesis_firehose_stream_name
}

Update the stack by executing terraform commands.

cd ~/demo.cevo/tf
terraform init
#run terraform plan to see what resources will be created/added
terraform plan
#create the resources
terraform apply --auto-approve

If all goes well, you will see the following message:
Apply complete! Resources: 4 added, 0 changed, 0 destroyed.

At this stage your kinesis firehose delivery stream bucket is created.

Create an IAM service account(user)

Snowflake needs access to the s3 bucket. Ideally a role is created to allow cross account access from snowflake to our aws account. But for the sake of simplicity we will use an IAM service account(user) with read only access to the s3 bucket. We shall also create the access keys for this service account, which we will later need to configure at the end of Snowflake.

We proceed to create the iam-user-for-sf module.

mkdir -p ~/demo.cevo/tf/modules/iam-user-for-sf
cd ~/demo.cevo/tf/modules/iam-user-for-sf
#creating the soft link to required provider
ln -snf ../../required_provider.tf required_provider.tf
# create the 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "iam_username" { type = string }
variable "s3bucket_arn" { type = string }
# create the 'main.tf' file here. See output of following cat command for the content of the file.
cd ~/demo.cevo/tf/modules/iam-user-for-sf
cat main.tf
data "aws_iam_policy_document" "s3_read_policy" {
  statement {
    effect = "Allow"
    actions = [
      "s3:Get*",
      "s3:List*"
    ]
    resources = [
      var.s3bucket_arn,
      "${var.s3bucket_arn}/*",
    ]
  }
}

resource "aws_iam_user" "sfuser" {
  name = var.iam_username
}

resource "aws_iam_user_policy" "sfuser_s3ropolicy" {
  name = "sfuser_s3ropolicy"
  user = aws_iam_user.sfuser.name
  policy = data.aws_iam_policy_document.s3_read_policy.json
}

resource "aws_iam_access_key" "sfuseraccesskey" {
  user    = aws_iam_user.sfuser.name
}

output "id" {
  value = aws_iam_access_key.sfuseraccesskey.id
}

output "secret" {
  value = aws_iam_access_key.sfuseraccesskey.secret
}

Above code will create an iam policy that allows read only permission to the specified S3 bucket, associates the policy with a newly created iam user and also provides the access key and secret for that user.

Update your variable file and main file in your working directory to include invocation of the iam-user-for-sf module.

cd ~/demo.cevo/tf
# update the 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "twi_s3_bucket_name" {
  type = string
  default = "damz-twi-s3-bucket-1"
}

variable "twi_kinesis_firehose_stream_name" {
  type = string
  default = "terraform-kinesis-firehose-twi-stream"
}

variable "iam_username_for_sf" {
  type = string
  default = "sfusers3ro"
}
cd ~/demo.cevo/tf
# update the 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
module "s3bucket-for-sf" {
  source = "./modules/s3bucket-for-sf"
  s3_bucket_name = var.twi_s3_bucket_name
}

module "kinesis-setup" {
  source = "./modules/kinesis-setup"
  bucket_arn = module.s3bucket-for-sf.arn
  kinesis_firehose_stream_name = var.twi_kinesis_firehose_stream_name
}

module "iam-user-for-sf" {
  source = "./modules/iam-user-for-sf"
  iam_username = var.iam_username_for_sf
  s3bucket_arn = module.s3bucket-for-sf.arn
}

Update the stack by executing Terraform commands.

cd ~/demo.cevo/tf
terraform init
#run terraform plan to see what resources will be created/added
terraform plan
#create the resources
terraform apply --auto-approve

If all goes well you will see the following message:
Apply complete! Resources: 3 added, 0 changed, 0 destroyed.

Create snowflake resources

Assumption here is that you have set up your Snowflake account. Terraform distributes a snowflake provider for managing Snowflake accounts. We can simply define required_providers & providers in our terraform code to start using the snowflake provider. Assuming that you have details such as snowflake account id, account username and password with privileges that allows you to create & manage resources in snowflake. For simplicity we will be using a root account operating under ACCOUNTADMIN role.

Snowflake providers

First let’s update the provider files as shown below:

cd ~/demo.cevo/tf
# update the 'required_provider.tf' file here. See output of following cat command for the content of the file.
cat required_provider.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 3.0"
    }
    snowflake = {
      source = "chanzuckerberg/snowflake"
      version = "0.20.0"
    }
  }
}
# update the 'required_provider.tf' file here. See output of following cat command for the content of the file.
cd ~/demo.cevo/tf
cat provider.tf
# Configure the AWS Provider
provider "aws" {
  region = "ap-southeast-2"
}

provider "snowflake" {
  region  = "ap-southeast-2"
  role = "ACCOUNTADMIN"
}

At this stage it may be a good idea to test your provider configuration by executing terraform init.

cd ~/demo.cevo/tf
terraform init

If you see something like Installing chanzuckerberg/snowflake v0.20.0… followed by get Terraform has been successfully initialized! message then we can proceed further. If not, check that you have terraform installed and have defined the provider files properly.

Snowflake account credentials

There are multiple ways you can authenticate against snowflake but we will test with simple password based authentication.

Please ensure that account credentials like account number, user name and password are defined as part of your environment. Terraform Snowflake providers rely on these environment variables.

export SNOWFLAKE_ACCOUNT="<your account>"
export SNOWFLAKE_USER="<your user name>"
export SNOWFLAKE_PASSWORD="<your password>"

Create Snowflake resources

We will now define the snowflake-setup terraform module that will create Snowflake resources. This module needs the following inputs:

variabledescription
external_s3_bucketThe s3 bucket name from which snowflake will read data
aws_region_idRegion ID
aws_access_key_idThe access key id of the iam service account which allows read only access to the s3 bucket
aws_secret_access_keyThe secret access key of the iam service account which allows read only access to the s3 bucket.

For the sake of simplicity we will use the following hardcoded names for our snowflake resources that we will create:

Snowflake resourcename
WarehouseTWI_WH
DatabaseTWIDB
SchemaPUBLIC
TableTWEETS
External StageTWITTER_STAGE
Snowpipesnowpipe

Proceed to create the snowflake-setup module as shown below

mkdir -p ~/demo.cevo/tf/modules/snowflake-setup
cd ~/demo.cevo/tf/modules/snowflake-setup
#creating the soft link to required provider
ln -snf ../../required_provider.tf required_provider.tf
# create the 'variables.tf' file here. See output of following cat command for the content of the file.
cat variables.tf
variable "aws_access_key_id" { type = string }
variable "aws_secret_access_key" { type = string }
variable "external_s3_bucket" { type = string }
# create the 'main.tf' file here. See output of following cat command for the content of the file.
cd ~/demo.cevo/tf/modules/snowflake-setup
cat main.tf
locals {
  file_format = " file_format=(type='JSON')"
}

resource "snowflake_warehouse" "TWI_WH" {
  name           = "TWI_WH"
  comment        = "twitter analysis wh"
  warehouse_size = "XSMALL"
  auto_suspend   = 120
  auto_resume    = true
}

resource "snowflake_database" "TWIDB" {
  name                        = "TWIDB"
  comment                     = "Twitter DB"
  data_retention_time_in_days = 1
}

#creates a snowflake table. Notice the data type of column is a variant type which allows us to store json like data
resource "snowflake_table" "TWI_TABLE" {
  database = snowflake_database.TWIDB.name
  schema   = "PUBLIC"
  name     = "TWEETS"
  comment  = "Twitter streams data table."
  column {
    name = "tweet"
    type = "VARIANT"
  }
}

#Creates snowflake external stage from (s3) which snowpipe will read data files
#we are using aws access keys here to allow access to s3, but as mentioned earlier
#external IAM roles can be used to manage the cross account access control in a better way.
#Refer https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-s3.html
resource "snowflake_stage" "external_stage_s3" {
  name        = "TWITTER_STAGE"
  url         = join("", ["s3://", var.external_s3_bucket, "/"])
  database    = snowflake_database.TWIDB.name
  schema      = "PUBLIC"
  credentials = "AWS_KEY_ID='${var.aws_access_key_id}' AWS_SECRET_KEY='${var.aws_secret_access_key}'"
}

#create pipe to copy into the tweets table from the external stage
resource "snowflake_pipe" "snowpipe" {
  name     = "snowpipe"
  database = snowflake_database.TWIDB.name
  schema   = "PUBLIC"
  comment  = "This is the snowpipe that will consume kinesis delivery stream channelled via the sqs."
  copy_statement = join("", [
    "copy into ",
    snowflake_database.TWIDB.name, ".PUBLIC.", snowflake_table.TWI_TABLE.name,
    " from @",
    snowflake_database.TWIDB.name, ".PUBLIC.", snowflake_stage.external_stage_s3.name,
    local.file_format
  ])
  auto_ingest = true
}

output "sqs_4_snowpipe" {
  value = snowflake_pipe.snowpipe.notification_channel
}

Note: The snowflake-setup module’s output sqs_4_snowpipe gives us the sqs notification channel that we will configure later in our s3 bucket for event notifications.

Update your main file under our working directory to include invocation of the snowflake-setup module. Note how we are passing the iam user access credentials as input to the snowflake-setup module.

cd ~/demo.cevo/tf
# update the 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
module "s3bucket-for-sf" {
  source = "./modules/s3bucket-for-sf"
  s3_bucket_name = var.twi_s3_bucket_name
}

module "kinesis-setup" {
  source = "./modules/kinesis-setup"
  bucket_arn = module.s3bucket-for-sf.arn
  kinesis_firehose_stream_name = var.twi_kinesis_firehose_stream_name
}

module "iam-user-for-sf" {
  source = "./modules/iam-user-for-sf"
  iam_username = var.iam_username_for_sf
  s3bucket_arn = module.s3bucket-for-sf.arn
}

module "snowflake-setup" {
  source = "./modules/snowflake-setup"
  aws_access_key_id = module.iam-user-for-sf.id
  aws_secret_access_key = module.iam-user-for-sf.secret
  external_s3_bucket = module.s3bucket-for-sf.bucket
}

Update the stack by executing terraform commands.

cd ~/demo.cevo/tf
terraform init
#run terraform plan to see what resources will be created/added
terraform plan
#create the resources
terraform apply --auto-approve

If all goes well, you will see the following message:
Apply complete! Resources: 5 added, 0 changed, 0 destroyed.

Finally configure s3 bucket’s all object create events to notify snowpipe’s sqs queue

Update your main file under our working directory to configure s3 bucket’s all object create events to notify snowpipe’s sqs queue.
Unlike previous cases where we invoke terraform module, this time we shall directly specify ‘aws_s3_bucket_notification’ resource creation directive.

cd ~/demo.cevo/tf
# update the 'main.tf' file here. See output of following cat command for the content of the file.
cat main.tf
module "s3bucket-for-sf" {
  source = "./modules/s3bucket-for-sf"
  s3_bucket_name = var.twi_s3_bucket_name
}

module "kinesis-setup" {
  source = "./modules/kinesis-setup"
  bucket_arn = module.s3bucket-for-sf.arn
  kinesis_firehose_stream_name = var.twi_kinesis_firehose_stream_name
}

module "iam-user-for-sf" {
  source = "./modules/iam-user-for-sf"
  iam_username = var.iam_username_for_sf
  s3bucket_arn = module.s3bucket-for-sf.arn
}

module "snowflake-setup" {
  source = "./modules/snowflake-setup"
  aws_access_key_id = module.iam-user-for-sf.id
  aws_secret_access_key = module.iam-user-for-sf.secret
  external_s3_bucket = module.s3bucket-for-sf.bucket
}

resource "aws_s3_bucket_notification" "bucket_notification_to_sqs" {
  bucket = module.s3bucket-for-sf.id
  queue {
    queue_arn     = module.snowflake-setup.sqs_4_snowpipe
    events        = ["s3:ObjectCreated:*"]
  }
}

Update the stack by executing terraform commands.

cd ~/demo.cevo/tf
terraform init
#run terraform plan to see what resources will be created/added
terraform plan
#create the resources
terraform apply --auto-approve

If all goes well, you will see the following message:
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

At this stage we have created the Kinesis Data Firehose delivery stream pipeline to snowflake as shown here:

Graphical user interface

Description automatically generated with medium confidence

How can I test this?

We can test this by running a Python utility which will read twitter stream and in turn will forward the data to the specified Kinesis Data Firehose stream.
Assumption here is that you have

  • Installed python3 on your system.
  • Twitter Development credentials for the application to connect to twitter APIs
  • The environment in which the program will run will export the AWS access key and secret that will allow the program to talk to aws firehose.

We will need to install a couple of python libraries in order to run the python program.

#Install Twitter's tweepy library
python3 -m pip install tweepy

#Install AWS’s Boto3 library
python3 -m pip install boto3

In addition to the environment variables that have been defined in earlier stages, the following additional environment variables should be defined with proper values for the program to run the test client successfully. Of course there are ways to handle these values programmatically but that’s out of scope in this discussion.

#export the aws access keys (assuming that you have configured your aws cli - https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
export aws_access_key_id="<your value>"
export aws_secret_access_key="<your value>"

#export the twitter api keys
export twi_consumer_key="<your value>"
export twi_consumer_secret="<your value>"
export twi_access_token="<your value>"
export twi_access_token_secret="<your value>"

Create the python script to read twitter streams and forward the same to firehose:

mkdir ~/demo.cevo/scripts
cd ~/demo.cevo/scripts
# create the 'get-twitter-streams.py' file here. See output of following cat command for the content of the file.
cat get-twitter-streams.py
import os
import json
import boto3
from tweepy import OAuthHandler, Stream, StreamListener

def check_empty(varname, varval):
 
if not varval:
    print("Unexpected empty Value: {0}".format(varname))
    exit(1)

#read env variables
kinesis_delivery_stream_name = os.getenv("twi_kinesis_firehose_stream_name")
check_empty("kinesis_delivery_stream_name", kinesis_delivery_stream_name)

twi_consumer_key = os.getenv("twi_consumer_key")
check_empty("twi_consumer_key", twi_consumer_key)

twi_consumer_secret = os.getenv("twi_consumer_secret")
check_empty("twi_consumer_secret", twi_consumer_secret)

twi_access_token = os.getenv("twi_access_token")
check_empty("twi_access_token", twi_access_token)

twi_access_token_secret = os.getenv("twi_access_token_secret")
check_empty("twi_access_token_secret", twi_access_token_secret)

aws_region_id = os.getenv("aws_region_id")
check_empty("aws_region_id", aws_region_id)

aws_access_key_id = os.getenv("aws_access_key_id")
check_empty("aws_access_key_id", aws_access_key_id)

aws_secret_access_key = os.getenv("aws_secret_access_key")
check_empty("aws_secret_access_key", aws_secret_access_key)

#In case you are using temporary token/creds you will need the session token
aws_session_token = os.getenv("aws_session_token")
if not aws_session_token:
  session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=aws_region_id
  )
else:
  session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    aws_session_token=aws_session_token,
    region_name=aws_region_id
  )

kinesis_client = session.client('firehose')

def sendToStream(data):
  response = kinesis_client.put_record(
      DeliveryStreamName = kinesis_delivery_stream_name,
      Record = {
          'Data': data.encode()
      }
  )
 
return response

class StdOutListener(StreamListener):
 
def on_data(self, data):
    print(data)
    response = sendToStream(data)
   
return True

 
def on_error(self, status):
   
if status == 420:
     
return False

listnr = StdOutListener()

#twitter - capture stream
auth = OAuthHandler(twi_consumer_key, twi_consumer_secret)
auth.set_access_token(twi_access_token, twi_access_token_secret)
stream = Stream(auth=auth, listener=listnr)
stream.filter(track=['cricket'])

Before we execute the program just a word of caution. Don’t leave the program running for longer than necessary time. This is to make sure that you do not accidentally end up paying for your accidental usage.
As soon as you have tested the client, terminate the client program.

Let’s proceed to execute the above program:

cd ~/demo.cevo
python3 scripts/get-twitter-streams.py

If all goes well, you will start seeing lots of Twitter stream data output on your stdout and at the same time the data is being forwarded to Kinesis Data Firehose and in turn to the S3 bucket.
Give it a minute. Then open the S3 bucket via your AWS console (or browse using AWS cli) and you will see a few files being created.

Login to your snowflake account and switch to ACCOUNTADMIN role. Check the tweets table that we created. You can use the following simple query to check the number of records in the table. You see multiple rows of json data which is nothing but the twitter stream records.

select count(*) from TWIDB.PUBLIC.TWEETS;

You can perform further analysis on the data that is collected and gather insights.

Destroying the environment

It’s important to delete your resources if you are not using it so as to avoid unnecessary costs. Terminate the python program by pressing CTRL^C on your console.
And simply execute terraform destroy. 

cd ~/demo.cevo/tf
terraform init
#destroy all the resources
terraform destroy --auto-approve

Retrospect

We have seen above how we can use Terraform to set up an entire streaming data ingestion platform using Amazon services and Snowflake within minutes. Traditionally doing something like would have meant a lot of planning, efforts and upfront cost. Cloud services allow us the agility and efficiency with which we can deliver great results in a short period of time. 

But it should be noted that  for any cloud platform there is a few more things that should be considered while developing the ideal world solution, some of which are:

  • Restrict access control (IAM) to least privileged
  • Cost efficiency
  • Data Protection – encryption at rest & in transit
  • Backup
  • Data Transformation
  • Secure Data Handling
  • Terraform state management
  • …and more

Enjoyed this blog?

Share it with your network!

Move faster with confidence