Option 2: Automating Snowpipe with AWS Lambda

AWS Lambda is a compute service that runs when triggered by an event and executes code that has been loaded into the system. You can adapt the sample Python code provided in this topic and create a Lambda function that calls the Snowpipe REST API to load data from your external stage (i.e. S3 bucket; Azure containers are not supported). The function is deployed to your AWS account, where it is hosted. Events you define in Lambda (e.g. when files in your S3 bucket are updated) invoke the Lambda function and run the Python code.

This topic describes the steps necessary to configure a Lambda function to automatically load data in micro-batches continuously using Snowpipe.

Note

This topic assumes you have configured Snowpipe using the instructions in Preparing to Load Data Using the Snowpipe REST API.

In this Topic:

Step 1: Write Python Code Invoking the Snowpipe REST API

Sample Python code

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile

import os
import sys
import uuid

# Assume the public key has been registered in Snowflake
# private key in PEM format
private_key="""
-----BEGIN RSA PRIVATE KEY-----
...
-----END RSA PRIVATE KEY-----
"""

# Proxy object that abstracts the Snowpipe REST API
ingest_manager = SimpleIngestManager(account='<account_name>',
                   host='<account_name>.<region_id>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)

Note

The sample code does not account for error handling. For example, it does not retry failed ingest_manager calls.

Before using the sample code, make the following changes:

  1. Update the security parameter:

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Specifies the content of the private key file you created in Using Key Pair Authentication (in Preparing to Load Data Using the Snowpipe REST API).

  2. Update the session parameters:

    account='<account_name>'

    Specifies the name of your account (provided by Snowflake).

    host='<account_name>.<region_id>.snowflakecomputing.com'

    Specifies your host information in the form of a URL. Note that the format of the URL is different depending on the region where your account is located:

    US West

    <account_name>.snowflakecomputing.com

    Other regions

    <account_name>.<region_id>.snowflakecomputing.com

    Where <account_name> is the name of your account (provided by Snowflake) and <region_id> is:

    Region

    Region ID

    Notes

    Amazon Web Services (AWS)

    US West (Oregon)

    us-west-2

    Required only when configuring AWS PrivateLink for accounts in US West.

    US East (Ohio)

    us-east-2.aws

    US East (N. Virginia)

    us-east-1

    US East (Commercial Gov - N. Virginia)

    us-east-1-gov.aws

    Available only for accounts on Business Critical (or higher); not located in AWS GovCloud (US), which is a separate, dedicated cloud not yet supported by Snowflake.

    Canada (Central)

    ca-central-1.aws

    EU (Ireland)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Tokyo)

    ap-northeast-1.aws

    Asia Pacific (Mumbai)

    ap-south-1.aws

    Asia Pacific (Singapore)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe West2 (London)

    europe-west2.gcp

    Europe West4 (Netherlands)

    europe-west4.gcp

    Microsoft Azure

    West US 2 (Washington)

    west-us-2.azure

    East US 2 (Virginia)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Available only for accounts on Business Critical (or higher).

    Canada Central (Toronto)

    canada-central.azure

    West Europe (Netherlands)

    west-europe.azure

    Switzerland North (Zurich)

    switzerland-north.azure

    Southeast Asia (Singapore)

    southeast-asia.azure

    Australia East (New South Wales)

    australia-east.azure

    user='<user_login_name>'

    Specifies the login name of the Snowflake user that will run the Snowpipe code.

    pipe='<db_name>.<schema_name>.<pipe_name>'

    Specifies the fully-qualified name of the pipe to use to load the data, in the form of <db_name>.<schema_name>.<pipe_name>.

  3. Specify the path to your files to import in the file objects list:

    staged_file_list = []

    The path you specify must be relative to the stage where the files are located. Include the complete name for each file, including the file extension. For example, a CSV file that is gzip-compressed might have the extension .csv.gz.

  4. Save the file in a convenient location.

The remaining instructions in this topic assume the file name to be SnowpipeLamdbaCode.py.

Step 2: Create a Lambda Function Deployment Package

Complete the following instructions to build a Python runtime environment for Lambda and add the Snowpipe code you adapted in Step 1: Write Python Code Invoking the Snowpipe REST API (in this topic). For more information about these steps, see the AWS Lambda deployment package documentation (see the instructions for Python).

Important

The scripts in the following steps are a representative example and assume that you are creating an AWS EC2 Linux instance based on an Amazon Machine Instance (AMI) that uses the YUM package manager, which depends on RPM. If you select a Debian-based Linux AMI, please update your scripts accordingly.

  1. Create an AWS EC2 Linux instance by completing the AWS EC2 instructions. This instance will provide the compute resources to run the Snowpipe code.

  2. Copy the Snowpipe code file to your new AWS EC2 instance using SCP (Secure Copy):

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    

    Where:

    • <path> is the path to your local SnowpipeLambdaCode.py file.

    • <machine>.<region_id> is the DNS name of the EC2 instance (e.g. ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      The DNS name is displayed on the Instances screen in the Amazon EC2 console.

  3. Connect to the EC2 instance using SSH (Secure SHell):

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
  4. Install Python and related libraries on the EC2 instance:

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
  5. Create the .zip deployment package (Snowpipe.zip):

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    

Step 3: Create an AWS IAM Role for Lambda

Follow the AWS Lambda documentation to create an IAM role to execute the Lambda function.

Record the IAM Amazon Resource Name (ARN) for the role. You will use it in the next step.

Step 4: Create the Lambda Function

Create the Lambda function by uploading the .zip deployment package you created in Step 2: Create a Lambda Function Deployment Package (in this topic):

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024

For --role, specify the role ARN you recorded in Step 3: Create an AWS IAM Role for Lambda (in this topic).

Record the ARN for the new function from the output. You will use it in the next step.

Step 5: Allow Calls to the Lambda Function

Grant S3 the permissions required to invoke your function.

For --source-arn, specify the function ARN you recorded in Step 4: Create the Lambda Function (in this topic).

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser

Step 6: Register the Lambda Notification Event

Register a Lambda notification event by completing the Amazon S3 Event Notifications instructions. In the input field, specify the function ARN you recorded in Step 4: Create the Lambda Function (in this topic).