In the current example, we are going to understand the process of curation of data in a data lake that are backed by append only storage services like Amazon S3. Since update semantics are not available in these storage services, we are going to run transformation using PySpark transformation on datasets to create new snapshots for target partitions and overwrite them. This example can be executed using Amazon EMR or AWS Glue. For simplicity, we are assuming that all IAM roles and/or LakeFormation permissions have been pre-configured.
Source:
Catalog table orders.i_order_input is created on raw ingested datasets in CSV format.
The table is partitioned by feed_arrival_date.It receives change records everyday in a new folder in S3 e.g. s3://<bucket_name>/input/<yyyy-mm-dd>/.
There can be duplicates due to multiple updates to the same order in a day.
order_update_timestamp represents the time when the order was updated
Target
Catalog table orders.c_order_output is a curated deduplicated table that is partitioned by order_date.
Example Datasets
You can use Glue crawlers to create tables in your catalog after uploading the files to S3
Solution Walk through
Import necessary spark libraries.
import osfrom pyspark.sql import SparkSessionfrom pyspark.sql.window import Windowfrom pyspark.context import SparkContextfrom pyspark.sql import functions as F, types as Tspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Read source table orders.i_orders_input and creare a dataframe stg_df