Unlike traditional Data Lake, new table formats (Iceberg, Hudi and Delta Lake) support features that can be used to apply data warehousing patterns, which can bring a way to be rescued from Data Swamp. In this post, we’ll discuss how to implement ETL using retail analytics data. It has two dimension data (user and product) and a single fact data (order). The dimension data sets have different ETL strategies depending on whether to track historical changes. For the fact data, the primary keys of the dimension data are added to facilitate later queries. We’ll use Iceberg for data storage/management and Spark for data processing. Instead of provisioning an EMR cluster, a local development environment will be used. Finally the ETL results will be queried by Athena for verification.
EMR Local Environment
In one of my earlier posts, we discussed how to develop and test Apache Spark apps for EMR locally using Docker (and/or vscode). Instead of provisioning an EMR cluster, we can quickly build an ETL app using the local environment. For this post, a new local environment is created based on the Docker image of the latest EMR 6.6.0 release. Check the GitHub repository for this post for further details.
Apache Iceberg is supported by EMR 6.5.0 or later and it requires iceberg-defaults configuration classification that enables Iceberg. The latest EMR Docker release (emr-6.6.0-20220411), however, doesn’t support that configuration classification and I didn’t find the iceberg folder ( /usr/share/aws/iceberg) within the Docker container. Therefore the project’s AWS integration example is used instead and the following script (run.sh) is an update of the example script that allows to launch the Pyspark shell or to submit a Spark application.
# run.sh #!/usr/bin/env bash |
Here is an example of using the script.
# launch pyspark shell
|
ETL Strategy
Sample Data
We use the retail analytics sample database from YugaByteDB to get the ETL sample data. Records from the following 3 tables are used to run ETL on its own ETL strategy.
The main focus of the demo ETL application is to show how to track product price changes over time and to apply those changes to the order data. Normally ETL is performed daily but it’ll be time-consuming to execute daily incremental ETL with the order data because it includes records spanning for 5 calendar years. Moreover, as it is related to the user and product data, splitting the corresponding dimension records will be quite difficult. Instead I chose to run yearly incremental ETL. I first grouped orders in 4 groups where the first group (year 0) includes orders in 2016 and 2017. And each of the remaining groups (year 1 to 3) keeps records of a whole year from 2018 to 2020. Then I created 4 product groups in order to match the order groups and to execute incremental ETL together with the order data. The first group (year 0) keeps the original data and the product price is set to be increased by 5% in the following years until the last group (year 3). Note, with this setup, it is expected that orders for a given product tend to be mapped to a higher product price over time. On the other hand, the ETL strategy of the user data is not to track historical data so that it is used as it is. The sample data files used for the ETL app are listed below and they can be found in the data folder of the GitHub repository.
$ tree data |
Users
Slowly changing dimension (SCD) type 1 is implemented for the user data. This method basically upserts records by comparing the primary key values and therefore doesn’t track historical data. The data has the natural key of id and its md5 hash is used as the surrogate key named user_sk – this column is used as the primary key of the table. The table is configured to be partitioned by its surrogate key in 20 buckets. The table creation statement can be found below.
CREATE TABLE demo.dwh.users ( user_sk string, |
Products
Slowly changing dimension (SCD) type 2 is taken for product data. This method tracks historical data by adding multiple records for a given natural key. Same as the user data, the id column is the natural key. Each record for the same natural key will be given a different surrogate key and the md5 hash of a combination of the id and created_at columns is used as the surrogate key named prod_sk. Each record has its own effect period and it is determined by the eff_from and eff_to columns and the latest record is marked as 1 for its curr_flag value. The table is also configured to be partitioned by its surrogate key in 20 buckets. The table creation statement is shown below.
CREATE TABLE demo.dwh.products ( |
Orders
The orders table has a composite primary key of the surrogate keys of the dimension tables – users_sk and prod_sk. Those columns don’t exist in the source data and are added during transformation. The table is configured to be partitioned by the date part of the created_at column. The table creation statement can be found below.
CREATE TABLE demo.dwh.orders ( user_sk string, prod_sk string, |
ETL Implementation
Users
In the transformation phase, a source dataframe is created by creating the surrogate key (user_sk), changing data types of relevant columns and selecting columns in the same order as the table is created. Then a view (users_tbl) is created from the source dataframe and it is used to execute MERGE operation by comparing the surrogate key values of the source view with those of the target users table.
# src.py def etl_users(file_path: str, spark_session: SparkSession): print(“users – upsert records…”) |
Products
The source dataframe is created by adding the surrogate key while concatenating the id and created_at columns, followed by changing data types of relevant columns and selecting columns in the same order as the table is created. The view (products_tbl) that is created from the source dataframe is used to query all the records that have the product ids in the source table – see products_to_update. Note we need data from the products table in order to update eff_from, eff_to and current_flag column values. Then eff_lead is added to the result set, which is the next record’s created_at value for a given product id – see products_updated. The final result set is created by determining the curr_flag and eff_to column value. Note that the eff_to value of the last record for a product is set to ‘9999-12-31 00:00:00’ in order to make it easy to query the relevant records. The updated records are updated/inserted by executing MERGE operation by comparing the surrogate key values to those of the target products table
# src.py def etl_products(file_path: str, spark_session: SparkSession): print(“products – upsert records…”) |
Orders
After transformation, a view (orders_tbl) is created from the source dataframe. The relevant user (user_sk) and product (prod_sk) surrogate keys are added to source data by joining the users and products dimension tables. The users table is SCD type 1 so matching the user_id alone is enough for the join condition. On the other hand, additional join condition based on the eff_from and eff_to columns is necessary for the products table as it is SCD type 2 and records in that table have their own effective periods. Note that ideally we should be able to apply INNER JOIN but the sample data is not clean and some product records are not matched by that operation. For example, an order whose id is 15 is made at 2018-06-26 02:24:38 with a product whose id is 116. However the earliest record of that product is created at 2018-09-12 15:23:05 and it’ll be missed by INNER JOIN. Therefore LEFT JOIN is applied to create the initial result set (orders_updated) and, for those products that are not matched, the surrogate keys of the earliest records are added instead. Finally the updated order records are appended using the DataFrameWriterV2 API.
# src.py def etl_orders(file_path: str, spark_session: SparkSession): |
Run ETL
The ETL script begins with creating all the tables – users, products and orders. Then the ETL for the users table is executed. Note that, although it is executed as initial loading, the code can also be applied to incremental ETL. Finally incremental ETL is executed for the products and orders tables. The application can be submitted by ./run.sh spark-submit etl.py. Note to create the following environment variables before submitting the application.
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
AWS_SESSION_TOKEN
Note it is optional and required if authentication is made via assume role
AWS_REGION
Note it is NOT AWS_DEFAULT_REGION
# etl.py from pyspark.sql import SparkSession ## products etl – assuming SCD type 2 ## orders etl – relevant user_sk and prod_sk are added during transformation |
Once the application completes, we’re able to query the iceberg tables on Athena. The following query returns all products whose id is 1. It is shown that the price increases over time and the relevant columns (curr_flag, eff_from and eff_to) for SCD type 2 are created as expected.
SELECT * |
The following query returns sample order records that bought the product. It can be checked that the product surrogate key matches the products dimension records.
WITH src_orders AS ( |
Summary
In this post, we discussed how to implement ETL using retail analytics data. In transformation, SCD type 1 and SCD type 2 are applied to the user and product data respectively. For the order data, the corresponding surrogate keys of the user and product data are added. A Pyspark application that implements ETL against Iceberg tables is used for demonstration in an EMR location environment. Finally the ETL results will be queried by Athena for verification.