Using Apache Spark and Glue Job to load Apache Iceberg tables on batch and streaming mode

In my previous blog post, we discussed the architecture of Apache Iceberg, including its benefits and existing limitations of the data table format. We also demonstrated how to create and load an Iceberg table using Amazon Athena.

In this blog, we will expand on Apache Iceberg as a data table format and provide more complex details on how to better utilise Iceberg tables in your data workload. We will also demonstrate how to batch and stream process records, and load them into Iceberg tables.

AWS Glue vs AWS EMR

A common question we get asked is what service should be used for data processing and analysis: AWS Glue or AWS EMR?

AWS Glue and AWS EMR are two popular services offered by AWS. And, while both services are used for similar purposes, they have some differences in their features and use cases.

AWS Glue is a fully-managed extract, transform, load (ETL) service that makes it easy for us to prepare and transform data for analytics. We can create and run ETL jobs, which extract data from various sources, transform it using customised logic, and load the transformed data into data lakes, data warehouses, or other data stores for analysis. AWS Glue also provides a number of built-in transformations.

On the other hand, AWS EMR (Elastic MapReduce) is a fully-managed big data processing service that allows users to process large amounts of data using distributed processing frameworks, including Apache Hadoop, Apache Spark, and Presto. It provides a fully-managed Hadoop and Spark cluster that can be easily provisioned and scaled up or down as per the workload. AWS EMR is designed for data processing tasks that require a high degree of parallelism and scalability, such as data analysis, machine learning, and batch processing.

As you can see from the above definitions, both services are capable of running the same job. So, the decision on the best service to use will be dependent on the workload and what we’re trying to achieve.

It’s also important to consider cost. AWS Glue is a serverless service, so you pay only for what you consume with minimum configuration. AWS EMR on the other hand, requires you to set up and maintain a server, which can lead to overhead maintenance costs.

Despite the code presented in this blog is run on Glue, the same code can be run in an EMR cluster with minimal modifications, as the code is written in PySpark, which is an interface for Apache Spark in Python and can be run in both Glue and EMR.

Load Iceberg tables using Apache Spark in batch mode

Create Apache Iceberg Table v2

The following query creates an Iceberg table. Some parameters can be changed to accommodate your workload, but `table_type=iceberg` and `format-version=2` should not be changed. We partition the data by the day of the trip, so when Spark reads the data coming in, Spark joins all of the records within the same day in the same data file. This avoids any S3 hot partitions and should suffice for most workloads, but you can create more partitions if your workload requires it.

 

CREATE TABLE glue_catalog.iceberg.nyctaxis (
VendorID bigint,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag string,
RatecodeID double,
PULocationID int,
DOLocationID int,
passenger_count double,
trip_distance double,
fare_amount double,
extra double,
mta_tax double,
tip_amount double,
tolls_amount double,
ehail_fee double,
improvement_surcharge double,
total_amount double,
payment_type double,
trip_type double,
congestion_surcharge double)
PARTITIONED BY (days(lpep_pickup_datetime))
LOCATION ‘s3://data-lake-apache-iceberg/greentaxis’
TBLPROPERTIES (
  ‘table_type’=‘iceberg’,
  ‘format’=‘parquet’,
  ‘optimize_rewrite_delete_file_threshold’=’10’,
  ‘write_target_data_file_size_bytes’=‘536870912’,
  ‘format-version’=‘2’

Load the data

The source data for this blog comes from the TLC Trip Record dataset. This dataset records the yellow and green taxi rides in New York City. The files are stored as csv files in S3. In this blog, we are using Apache Spark as the compute engine to extract, transform and load data into Iceberg tables. Here is a snippet of code informing Spark to load the CSV file in memory and to copy into an Iceberg table. 

In the first instance, we inform the csv schema to Spark. There are a few reasons to do this:

  1. To avoid Spark peeking into the file to infer the schema, since we already determined it. This is known as lazy evaluation which is a crucial optimisation technique in Spark
  2. To make sure the structure remains the same to avoid issues with loading or any downstream processes because the inference was incorrectly done

Notice the location of the files has a wildcard at the end of the string, meaning Spark will read and process all of the files in that folder. As part of a data ingestion pipeline, we should consider how to deal with new and already processed files. It is recommended to move processed files to a different folder so the same data is not processed twice. We can set up a trigger to initiate loading every time a new file is uploaded. Apache Airflow is a good example for a management platform for data engineering pipelines such as the one presented here.

To load the data into a temporary table, we use the spark.read() method. Spark provides several read options that allow you to customise how data is read from the sources. In this example we have used the following options:

  • Format: The incoming file is in a CSV format
  • Header: The first line on the CSV file contains a header
  • Schema: We inform the schema, instead of letting Spark to infer the schema automatically
  • Load: All of the files inside the folder are read
  • dataFrame: A temporary data frame is created as the result of this query
 

# Create a Schema for the CSV data
csvSchema = StructType([ \
StructField(“VendorID”, IntegerType(),False), \
StructField(“lpep_pickup_datetime”, TimestampType(),False), \
StructField(“lpep_dropoff_datetime”, TimestampType(),False), \
StructField(“store_and_fwd_flag”, StringType(),False), \
StructField(“RatecodeID”, DoubleType(),False), \
StructField(“PULocationID”, IntegerType(),False), \
StructField(“DOLocationID”, IntegerType(),False), \
StructField(“passenger_count”, DoubleType(),False), \
StructField(“trip_distance”, DoubleType(),False), \
StructField(“fare_amount”, DoubleType(),False), \
StructField(“extra”, DoubleType(),False), \
StructField(“mta_tax”, DoubleType(),False), \
StructField(“tip_amount”, DoubleType(),False), \
StructField(“tolls_amount”, DoubleType(),False), \
StructField(“ehail_fee”, DoubleType(),False), \
StructField(“improvement_surcharge”, DoubleType(),False), \
StructField(“total_amount”, DoubleType(),False), \
StructField(“payment_type”, DoubleType(),False), \
StructField(“trip_type”, DoubleType(),False), \
StructField(“congestion_surcharge”, DoubleType(),False) \
])


# Load CSV files
location = “s3://data-lake-apache-iceberg/nyc-csv-data/input/*”
dataFrame = spark.read.format(“csv”).option(“header”,“true”).schema(csvSchema).load(location)
dataFrame.createOrReplaceTempView(“tmp_data”)

Write to an Iceberg v2 table

Apache Spark 3 introduced the new DataFrameWriterV2 API for writing to tables using data frames. The v2 API is recommended and has been used in this blog. The following code snippet copies the information from the data frame to the Iceberg table. For this operation we use the writeTo() Spark method. The following configuration is used:

  • Table: We select the dataFrame created in the previous step.
  • Select: We select all of the data within that table. We could be selective and reduce the amount of data being sent to the Iceberg table, but in this case, we are just sending everything.
  • TableProperty: Version of the Iceberg table
  • Option: We run this action with the fanout enabled. In the default configuration Spark will open a file, add information and then close the file, but considering Spark is writing to a partitioned table, any open files should stay open until the write task is completed, otherwise an error occurs if Spark tries to open the same file multiple times during the same operation.
 

# Load into Iceberg table
spark.table(“tmp_data”) \
  .select(“*”) \
  .writeTo(“glue_catalog.iceberg.nyctaxis”) \
  .tableProperty(“format-version”, “2”) \
  .option(“fanout-enabled”, “true”) \
  .append()

Results

Figure 1 demonstrates how files are stored in the S3. Records are stored according to their partition. In this case the lpep_pickup_datetime_day field. A new file is created when records match an existing partition at each processed file. In this case, the same partition existed in two out of three processed files. This file structure is created to avoid hot partitions both during the write and read processes.

Fig 1. Files are stored within the day the trip occurred to avoid hot partitions

Load Iceberg tables using Apache Spark streaming

Analysing streaming data is becoming more popular as companies try to gain insights to help identify and respond to emerging trends or issues quickly. This could involve analysing stream data to provide valuable insights into customer behaviour, preferences, and patterns. Or monitoring and analysing sensor data from manufacturing equipment or supply chain logistics data to identify inefficiencies or bottlenecks and improve operational efficiency and reduce costs. In both of these examples, companies can benefit from streaming data and take corrective action quicker than traditional analytics methodologies.

Using the Spark Structured Streaming engine to load Apache Iceberg tables can be a powerful way to manage real-time data streams efficiently. With Structured Streaming, it is possible to process and analyse live streaming data in real-time. And, with Apache Iceberg, it is possible to manage large-scale datasets in a unified way. To load Apache Iceberg tables, we need to create an Iceberg table with a schema that matches the incoming streaming data. Then, we use the Spark Structured Streaming engine to read and process the streaming data from Kafka. Then we use the Iceberg Spark Connector to write the processed data to the Iceberg table in micro batches. 

The streaming process architecture is shown in Figure 2. Dummy IoT sensor logs are created in an EC2 instance and send to a Kafka topic that will hold the messages produced. A Spark application running in the Glue job receives the messages and temporarily stores them as S3 objects while the window is open. After the window is closed, the messages are processed and committed to Iceberg tables. A Jupyter Notebook on AWS Glue is used to query the Iceberg tables.

Figure 2. Spark streaming architecture

There are many challenges and considerations when designing a streaming data architecture, such as what your business considers as real time. A data streaming architecture becomes more complex and more expensive the closer you get to real time. This means that having a workload that processes information in near-real time could be a good alternative. Spark streaming, for example, by default operates on a micro batch approach with a window of 100 seconds. Spark receives the data in real time, stores it in a temporary file and then processes the records at the end of every window. Sparks then commits the records to the Iceberg table. 

In addition, Iceberg creates new metadata information every time a transaction is committed to the data lake. As streaming queries can create new table versions quickly, and therefore a lot of table metadata to track those versions, we need to consider maintenance of these tables by expiring old snapshots and automatically cleaning up metadata files. Apache Iceberg recommends a 60 seconds minimum rate of commits. Both of the values presented here are configurable if your workload requires it. 

Create Apache Iceberg Table v2

Below is the code required to create the table used in this section.

 

CREATE TABLE glue_catalog.iceberg.tempsensors (
  client_id string,
  timestamp timestamp,
  humidity int,
  temperature int,
  pressure int,
  pitch string,
  roll string,
  yaw string,
  count bigint)
PARTITIONED BY (bucket(20,client_id))
LOCATION ‘s3://data-lake-apache-iceberg/tempsensors’
TBLPROPERTIES (
‘table_type’=‘iceberg’,
‘format’=‘parquet’,
‘optimize_rewrite_delete_file_threshold’=’10’,
‘write_target_data_file_size_bytes’=‘536870912’,
‘format-version’=‘2’
)

Streaming service

The two most popular streaming data services at AWS are Amazon Kinesis – the native service provided by AWS – and Apache Kafka – an open-source distributed event streaming platform. AWS provides a managed version of Apache Kafka – the Amazon Managed Streaming for Apache Kafka (MSK). In this blog, we choose the AWS managed version of Apache Kafka, as Kafka is a good fit for streaming data services for a range of different businesses. We should also choose managed products to reduce the configuration time and maintenance costs.

Below are the steps to create a MSK cluster and the configuration file:

# Create MSK cluster
$ aws kafka create-cluster –cli-input-json file://cluster_info.json

#File: cluster_info.json
{
  “BrokerNodeGroupInfo”: {
      “BrokerAZDistribution”: “DEFAULT”,
      “InstanceType”: “kafka.t3.small”,
      “ClientSubnets”: [
          “subnet-a”, “subnet-b”, “subnet-c”
      ],
      “SecurityGroups”: [
          “sg-1”, “sg-2”
      ],
      “StorageInfo”: {
          “EbsStorageInfo”: {
              “VolumeSize”: 20
          }
      }
  },
  “ClusterName”: “iceberg-stream-cluster”,
  “EncryptionInfo”: {
      “EncryptionAtRest”: {
          “DataVolumeKMSKeyId”: “”
      },
      “EncryptionInTransit”: {
          “InCluster”: true,
          “ClientBroker”: “TLS_PLAINTEXT”
      }
  },
  “EnhancedMonitoring”: “PER_TOPIC_PER_BROKER”,
  “KafkaVersion”: “3.3.2”,
  “NumberOfBrokerNodes”: 3,
  “OpenMonitoring”: {
      “Prometheus”: {
          “JmxExporter”: {
              “EnabledInBroker”: true
          },
          “NodeExporter”: {
              “EnabledInBroker”: true
          }
      }
  }
}

Create a Kafka topic

We create a Kafka topic that will be used as a dedicated channel for the messages produced.

 

$ kafka-topics.sh –create \
–bootstrap-server $BootstrapBrokerString \
–replication-factor 3 \
–partitions 1 \
–topic KafkaGlueIcebergTopic

Generate and stream dummy messages

We use a Python script to generate dummy IoT sensor data and send one record to a Kafka broker every second. The code below shows a Python script which generates random values usually found in a temperature IoT sensor. All of the fields are packaged in JSON and sent to the Kafka producer.

 

$ python3 iot-kafka-producer.py | kafka-console-producer.sh \
–broker-list $BootstrapBrokerString –topic KafkaGlueIcebergTopic

#File: iot-kafka-producer.py
import time
import datetime
import json
import sys
import random
from kafka import KafkaProducer

topic = “KafkaGlueIcebergTopic”
client_id = “raspberrypi”

def collect_and_send_data():
  publish_count = 0
  while(True):
      humidity = random.randint(0,120)
      temp = random.randint(0,60)
      pressure = random.randint(0,1600)
      orientation = {“pitch”:“sample”, “roll”:“demo”, “yaw”:“test”}
      timestamp = datetime.datetime.fromtimestamp(time.time()).strftime(‘%Y-%m-%d %H:%M:%S’)
      message = {
          “client_id”: client_id,
          “timestamp”: timestamp,
          “humidity”: humidity,
          “temperature”: temp,
          “pressure”: pressure,
          “pitch”: orientation[‘pitch’],
          “roll”: orientation[‘roll’],
          “yaw”: orientation[‘yaw’],
          “count”: publish_count
      }
      print(“Publishing message to topic ‘{}’: {}”.format(topic, message))
      kafka_producer(message)
 
      time.sleep(1)
      publish_count += 1

def kafka_producer(message):
  producer = KafkaProducer(bootstrap_servers=[‘b-1.***.c3.kafka.ap-southeast-2.amazonaws.com:9092’,‘b-3.***.c3.kafka.ap-southeast-2.amazonaws.com:9092’,‘b-2.***.c3.kafka.ap-southeast-2.amazonaws.com:9092’],
  value_serializer=lambda m: json.dumps(m).encode(‘utf-8’))
 
  future = producer.send(‘KafkaGlueIcebergTopic’ ,  value= message, partition= 0)
  future.get(timeout= 10)
     
if __name__ == ‘__main__’:
  collect_and_send_data()

Write to an Iceberg v2 table

The following script receives the data from the Kafka producer, processes it, transforms it, and loads the data into the Iceberg table. We use the readStream and writeStream to read and write to Iceberg tables respectively, from the PySpark DataFrame API. This script is run on AWS Glue as a Glue Job, but you can run the same code, albeit with some modifications to the boilerplate, in AWS EMR.

 

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# vim: tabstop=2 shiftwidth=2 softtabstop=2 expandtab

import os
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
from pyspark.conf import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import (col, from_json, to_timestamp)

args = getResolvedOptions(sys.argv, [‘JOB_NAME’,
‘catalog’,
‘database_name’,
‘table_name’,
‘primary_key’,
‘kafka_topic_name’,
‘starting_offsets_of_kafka_topic’,
‘kafka_connection_name’,
‘kafka_bootstrap_servers’,
‘iceberg_s3_path’,
‘lock_table_name’,
‘aws_region’,
‘window_size’
])

CATALOG = args[‘catalog’]
ICEBERG_S3_PATH = args[‘iceberg_s3_path’]
DATABASE = args[‘database_name’]
TABLE_NAME = args[‘table_name’]
PRIMARY_KEY = args[‘primary_key’]
DYNAMODB_LOCK_TABLE = args[‘lock_table_name’]
KAFKA_TOPIC_NAME = args[‘kafka_topic_name’]
KAFKA_CONNECTION_NAME = args[‘kafka_connection_name’]
KAFKA_BOOTSTRAP_SERVERS = args[‘kafka_bootstrap_servers’]
STARTING_OFFSETS_OF_KAFKA_TOPIC = args.get(‘starting_offsets_of_kafka_topic’, ‘latest’)
AWS_REGION = args[‘aws_region’]
WINDOW_SIZE = args.get(‘window_size’, ‘100 seconds’)

def setSparkIcebergConf() -> SparkConf:
conf_list = [
  (f“spark.sql.catalog.{CATALOG}”, “org.apache.iceberg.spark.SparkCatalog”),
  (f“spark.sql.catalog.{CATALOG}.warehouse”, ICEBERG_S3_PATH),
  (f“spark.sql.catalog.{CATALOG}.catalog-impl”, “org.apache.iceberg.aws.glue.GlueCatalog”),
  (f“spark.sql.catalog.{CATALOG}.io-impl”, “org.apache.iceberg.aws.s3.S3FileIO”),
  (f“spark.sql.catalog.{CATALOG}.lock-impl”, “org.apache.iceberg.aws.glue.DynamoLockManager”),
  (f“spark.sql.catalog.{CATALOG}.lock.table”, DYNAMODB_LOCK_TABLE),
  (“spark.sql.extensions”, “org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions”),
  (“spark.sql.iceberg.handle-timestamp-without-timezone”, “true”)
]
spark_conf = SparkConf().setAll(conf_list)
return spark_conf

# Set the Spark + Glue context
conf = setSparkIcebergConf()
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args[‘JOB_NAME’], args)

options_read = {
“kafka.bootstrap.servers”: KAFKA_BOOTSTRAP_SERVERS,
“subscribe”: KAFKA_TOPIC_NAME,
“startingOffsets”: STARTING_OFFSETS_OF_KAFKA_TOPIC
}

schema = StructType([
StructField(“client_id”, StringType(), False),
StructField(“timestamp”, TimestampType(), True),
StructField(“humidity”, IntegerType(), False),
StructField(“temperature”, IntegerType(), False),
StructField(“pressure”, IntegerType(), False),
StructField(“pitch”, StringType(), False),
StructField(“roll”, StringType(), False),
StructField(“yaw”, StringType(), False),
StructField(“count”, IntegerType(), False),
])

streaming_data = spark.readStream.format(“kafka”).options(**options_read).load()

stream_data_df = streaming_data \
  .select(from_json(col(“value”).cast(“string”), schema).alias(“source_table”)) \
  .select(“source_table.*”) \
  .withColumn(‘timestamp’, to_timestamp(col(‘timestamp’), ‘yyyy-MM-dd HH:mm:ss’))

table_id = f“{CATALOG}.{DATABASE}.{TABLE_NAME}”
checkpointPath = os.path.join(args[“TempDir”], args[“JOB_NAME”], “checkpoint/”)

# Writing against partitioned table
query = stream_data_df.writeStream \
  .format(“iceberg”) \
  .outputMode(“append”) \
  .trigger(processingTime=WINDOW_SIZE) \
  .option(“path”, table_id) \
  .option(“fanout-enabled”, “true”) \
  .option(“checkpointLocation”, checkpointPath) \
  .start()
query.awaitTermination()

Creating a Glue Streaming job

The last stage is to create and run the Glue Job. We pass a number of variables that were created in the previous steps.

 

# Create Glue Streaming Job
$ aws glue create-job –cli-input-json file://glue_config.json

# Start Glue Streaming Job
$ aws glue start-job-run –job-name load_streaming_data_glue_job

# File = glue_config.json
{
  “Name”: “load_streaming_data_glue_job”,
  “Role”: “arn:aws:iam:::role/AWSGlueServiceRole-iceberg-data-lake”,
  “ExecutionProperty”: {
      “MaxConcurrentRuns”: 1
  },
  “Command”: {
      “Name”: “gluestreaming”,
      “ScriptLocation”: “s3://data-lake-apache-iceberg/scripts/spark_iceberg_writes_with_dataframe.py”,
      “PythonVersion”: “3”
  },
  “DefaultArguments”: {
      “–TempDir”: “s3://data-lake-apache-iceberg/temporary/”,
      “–enable-metrics”: “true”,
      “–job-bookmark-option”: “job-bookmark-enable”,
      “–job-language”: “python”,
      “–enable-spark-ui”: “true”,
      “–spark-event-logs-path”: “s3://data-lake-apache-iceberg/sparkHistoryLogs/”,
      “–enable-job-insights”: “false”,
      “–enable-glue-datacatalog”: “true”,
      “–enable-continuous-cloudwatch-log”: “true”,
      “–kafka_connection_name”: “msk-connector”,
      “–kafka_bootstrap_servers”: “${KAFKA_BOOTSTRAP_SERVERS}”,
      “–extra-jars”: “s3://data-lake-iceberg/jars/aws-sdk-java-2.17.224.jar”,
      “–user-jars-first”: “true”,
      “–catalog”:“glue_catalog”,
      “–database_name”:“iceberg”,
      “–table_name”: “tempsensors”,
      “–primary_key”:“client_id”,
      “–lock_table_name”:“iceberg-lock”,
      “–kafka_topic_name”:“KafkaGlueIcebergTopic”,
      “–starting_offsets_of_kafka_topic”: “latest”,
      “–aws_region”:“ap-southeast-2”,
      “–window_size”: “100 seconds”,
      “–iceberg_s3_path”:“data-lake-apache-iceberg”
  },
  “MaxRetries”: 0,
  “Timeout”: 30,
  “WorkerType”: “G.1X”,
  “NumberOfWorkers”: 2,
  “GlueVersion”: “3.0”,
  “Connections”: {“Connections”: [“msk-connector”,“iceberg-connector”]}
}

Results

As you can see from Figure 3, every 100 seconds a new Spark job starts, and the job takes less than one second to complete. This means if this workload is adopted, your data lake receives new data in less than two minutes. This near real time architecture may be perfect for your workload.

Fig 3. Glue Job logs showing the files processed.

We have run the Glue Job for five minutes, but the job can run for longer. In Figure 4, you can see a new snapshot was created every 1-2 minutes during the job run. The rate of snapshot creation shown here highlights the importance of adopting the maintenance tasks discussed beforehand.

Figure 4. Iceberg table snapshots created after the Glue Job running for five minutes.

Conclusion

In this post, we have presented examples of using Amazon S3, Amazon MSK, Amazon Glue, and Apache Spark to build an Iceberg data lakehouse on AWS using batch and streaming processes. Spark and Iceberg tables provide significant value for both batch and streaming processes in data processing and analytics pipelines.

In batch processing, Spark can be used to process large volumes of data at once, using a batch-oriented processing model. This can be done on a periodic basis or as a one-time event. By using Spark with Iceberg tables, we can achieve high-performance processing of batch data with transactional guarantees, ensuring that data is consistently stored and retrieved. This can help improve the reliability and quality of data, while also reducing processing time and improving the speed of insights.

In streaming processing, Spark Streaming can be used to process data in real-time as it arrives, using a continuous processing model. This allows us to quickly respond to events as they occur and make decisions based on up-to-date information. 

In the case of a streaming workload, we need to look at our use case and decide how fresh the data needs to be. A real time architecture (less than 10 seconds to process new information) carries higher costs and complexity to your workload compared with an architecture that is near real time (a few minutes). The benefits from gaining insights from fresh data, may be the same between streaming data in real time vs near real time and therefore a more cost-optimised and less complex workload should be chosen .

Lastly, deploying Apache Iceberg or any other data table format to your data lakehouse architecture helps your business to comply with regulatory requirements, including complying with ‘right to be forgotten’ requests, as you can easy identity and delete individual records using SQL query instead of browsing through a vast number of opaque S3 objects to find that individual request.

Enjoyed this blog?

Share it with your network!

Move faster with confidence