aws-reference-architectures/datalake
  • Overview of a Data Lake on AWS
  • Amazon S3: A Storage Foundation for Datalakes on AWS
    • Data lake Storage Architecture FAQs
  • Data Catalog Architecture
    • Schema Management Within a Data Catalog
  • Data Security and Access Control Architecture
    • Data Security and Access Control Using IAM
    • Fine-grained Access Control With AWS LakeFormation
  • Ingestion Architectures for Data lakes on AWS
    • Data Ingestion using Kinesis Firehose and Kinesis Producer Library (KPL)
    • Data Ingestion using Database Migration Service(DMS) and Lambda
    • Data Ingestion using Amazon Glue
    • Data Ingestion From On-Premise NFS using Amazon DataSync
  • Data Curation Architectures
    • Overwrite Table Partitions Using PySpark
  • Data Consumption Architectures
    • Query S3 Data lake using Athena and Glue Catalog
    • Query Data lake using Redshift Spectrum and Glue Catalog
    • Query Data lake using EMR and External Hive Metastore in VPC
    • Query Data lake using EMR and Glue Catalog
  • Code of Conduct
  • Contributing Guidelines
Powered by GitBook
On this page
  • Overview
  • Architecture Component Walkthrough
  • Sample Lambda Function Using Python
  • Have suggestions? Join our Slack channel to share feedback.

Was this helpful?

  1. Ingestion Architectures for Data lakes on AWS

Data Ingestion using Database Migration Service(DMS) and Lambda

PreviousData Ingestion using Kinesis Firehose and Kinesis Producer Library (KPL)NextData Ingestion using Amazon Glue

Last updated 5 years ago

Was this helpful?

Overview

The is a managed service to migrate data into AWS. It can replicate data from operational databases and data warehouses (on premises or AWS) to a variety of targets, including S3 datalakes. In this architecture, DMS is used to capture changed records from relational databases on RDS or EC2 and write them into S3. , a serverless compute service, is used to transform and partition datasets based on their arrival time in S3 for better query performance.

Architecture Component Walkthrough

  1. Create a Relational databases on EC2 or RDS within a VPC.

  2. Create a Staging S3 location to store changes captured by DMS.

Sample Lambda Function Using Python

from datetime import datetime
import boto3
import os

s3 = boto3.client('s3')
target_bucket = os.environ['TARGET_S3_BUCKET']
target_s3_prefix = os.environ['TARGET_S3_PREFIX']# Please don't use '/' at the end of TARGET_S3_PREFIX variable

def lambda_handler(event, context):

    print("Initializing S3 copy utility for DMS...")
    for object in event['Records']:
        try:
            input_bucket_name=object['s3']['bucket']['name']
            input_key = object['s3']['object']['key']
            print("Initializing copy of input file: s3://{}/{}".format(input_bucket_name, input_key))
            input_file_basename = os.path.basename(object['s3']['object']['key'])
            if target_s3_prefix is None or target_s3_prefix == '':
                partitioned_prefix = s3.head_object(Bucket=input_bucket_name,Key=input_key)['LastModified'].strftime("/year=%Y/month=%m/day=%d/hour=%H/")
            else:
                partitioned_prefix =  target_s3_prefix + s3.head_object(Bucket=input_bucket_name,Key=input_key)['LastModified'].strftime("/year=%Y/month=%m/day=%d/hour=%H/")
                #S3 headObject API is used to fetch LastModified metadata from the S3 object.
            print("Starting copy of input S3 object to s3://{}/{}/{}".format(target_bucket, partitioned_prefix, input_file_basename))
            s3.copy_object(CopySource = {'Bucket': object['s3']['bucket']['name'], 'Key': object['s3']['object']['key']}, Bucket=target_bucket, Key=partitioned_prefix + input_file_basename )
            print("S3 key was successfully copied to {}".format(target_bucket))

        except Exception as e:
            print(e)
            print('Error copying object to {}'.format(target_bucket))
            raise e

using the DMS API's or console

for the Replication Instance.

which has read access on the staging S3 bucket and write access on target datalake location.

to trigger execution with s3:ObjectCreated:* requests to the staging S3 bucket. The function writes the same objects to the target datalake location on S3 with partitions based on the metadata attribute of S3 objects.

to migrate data from your source system to target location.

The DMS Replication Instance will then connect to the source via , and write to the S3 staging location. AWS Lambda will receive the PutObject events, and use the to reorganise the data into your datalake.

Have suggestions? Join our to share feedback.

Create a Replication Instance
Specify the Source & Target Endpoints
Create an IAM role for AWS Lambda
Create a Lambda function
custom code
LastModified
Create a DMS Task
elastic network interface(ENI)
S3 Copy API
Slack channel
AWS Database Migration Service(DMS)
AWS Lambda
Data Ingestion using DMS and Lambda