옵션 2: AWS Lambda를 사용한 Snowpipe 자동화

AWS Lambda는 이벤트에 의해 트리거되면 실행되고 시스템에 로드된 코드를 실행하는 컴퓨팅 서비스입니다. 사용자는 이 항목에서 제공되는 샘플 Python 코드를 조정하고 Snowpipe REST API를 호출하는 Lambda 함수를 생성하여 외부 스테이지(즉, S3 버킷이며, Azure 컨테이너는 지원되지 않음)에서 데이터를 로드할 수 있습니다. 이 함수는 호스팅되는 AWS 계정에 배포됩니다. Lambda에서 정의한 이벤트(예: S3 버킷의 파일이 업데이트되는 경우)는 Lambda 함수를 호출하고 Python 코드를 실행합니다.

이 항목에서는 Snowpipe를 사용하여 연속적으로 마이크로 배치 방식으로 데이터를 자동으로 로드하도록 Lambda 함수를 구성하기 위해 필요한 단계를 설명합니다.

참고

이 항목에서는 Snowpipe REST API를 사용하여 데이터 로드 준비하기 의 지침을 사용하여 Snowpipe를 구성한 것으로 가정합니다.

이 항목의 내용:

1단계: Snowpipe REST API를 호출하는 Python 코드 작성

샘플 Python 코드

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
                   host='<account_identifier>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key_text)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)
Copy

참고

샘플 코드에서는 오류 처리가 고려되지 않습니다. 예를 들어, 실패한 ingest_manager 호출을 다시 시도하지 않습니다.

샘플 코드를 사용하기 전, 다음과 같이 변경합니다.

  1. 보안 매개 변수를 업데이트합니다.

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    키 페어 인증 및 키 순환 사용하기 에서 생성(Snowpipe REST API를 사용하여 데이터 로드 준비하기)한 개인 키 파일의 내용을 지정합니다.

    PRIVATE_KEY_PASSPHRASE 환경 변수를 사용하여 개인 키 파일의 암호를 해독하기 위한 암호 구문을 지정합니다.

    • Linux 또는 macOS:

      export PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
    • Windows:

      set PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
  2. 세션 매개 변수를 업데이트합니다.

    account='<계정_식별자>'

    계정(Snowflake에서 제공)의 고유 식별자를 지정합니다. host 설명을 참조하십시오.

    host='<계정_식별자>.snowflakecomputing.com'

    Snowflake 계정의 고유 호스트 이름을 지정합니다.

    계정 식별자의 선호 형식은 다음과 같습니다.

    organization_name-account_name

    Snowflake 조직 및 계정의 이름입니다. 자세한 내용은 형식 1(기본 설정): 조직의 계정 이름 섹션을 참조하십시오.

    아니면, 필요한 경우 계정이 호스팅되는 리전클라우드 플랫폼 과 함께 계정 로케이터 를 지정합니다. 자세한 내용은 형식 2(레거시): 리전의 계정 로케이터 섹션을 참조하십시오.

    user='<사용자_로그인_이름>'

    Snowpipe 코드를 실행할 Snowflake 사용자의 로그인 이름을 지정합니다.

    pipe='<db_이름>.<스키마_이름>.<파이프_이름>'

    데이터를 로드하기 위해 사용할 파이프의 정규화된 이름(<db_이름>.<스키마_이름>.<파이프_이름> 형식)을 지정합니다.

  3. 파일 오브젝트 목록에서 가져올 파일의 경로를 지정합니다.

    staged_file_list = []

    지정하는 경로는 파일이 위치한 스테이지에 대해 상대적 이어야 합니다. 파일 확장자를 포함하여 각 파일의 전체 이름을 포함합니다. 예를 들어, gzip으로 압축된 CSV 파일은 확장자가 .csv.gz 일 수 있습니다.

  4. 원하는 위치에 파일을 저장합니다.

이 항목의 나머지 지침에서는 파일 이름이 SnowpipeLamdbaCode.py 인 것으로 가정합니다.

2단계: Lambda 함수 배포 패키지 만들기

다음 지침을 완료하여 Lambda용 Python 런타임 환경을 구축하고 이 항목의 1단계: Snowpipe REST API를 호출하는 Python 코드 작성 에서 조정한 Snowpipe 코드를 추가합니다. 이러한 단계에 대한 자세한 내용은 AWS Lambda 배포 패키지 설명서 (Python 지침 참조)를 참조하십시오.

중요

다음 단계의 스크립트가 대표적인 예이며, RPM을 사용하는 YUM 패키지 관리자를 사용하는 Amazon Machine Instance(AMI)를 기반으로 AWS EC2 Linux 인스턴스를 생성하는 것으로 가정합니다. Debian 기반 Linux AMI를 선택한 경우에는 그에 따라 스크립트를 업데이트합니다.

  1. AWS EC2 지침 을 완료하여 AWS EC2 Linux 인스턴스를 생성합니다. 이 인스턴스는 Snowpipe 코드를 실행하기 위한 컴퓨팅 리소스를 제공합니다.

  2. SCP(안전 복사)를 사용하여 새 AWS EC2 인스턴스에 Snowpipe 코드 파일을 복사합니다.

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    
    Copy

    여기서:

    • <path> 는 로컬 SnowpipeLambdaCode.py 파일의 경로입니다.

    • <머신>.<리전_id> 는 EC2 인스턴스의 DNS 이름(예: ec2-54-244-54-199.us-west-2.compute.amazonaws.com)입니다.

      DNS 이름은 Amazon EC2 콘솔의 Instances 화면에 표시됩니다.

  3. SSH(보안 SHell)를 사용하여 EC2 인스턴스에 연결합니다.

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
    Copy
  4. Python 및 관련 라이브러리는 EC2 인스턴스에 설치합니다.

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
    Copy
  5. .zip 배포 패키지(Snowpipe.zip)를 생성합니다.

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    
    Copy

3단계: Lambda용 AWS IAM 역할 만들기

AWS Lambda 설명서 를 따라 Lambda 함수를 실행할 IAM 역할을 생성합니다.

역할의 IAM Amazon 리소스 이름(ARN) 을 기록합니다. 이 기록은 다음 단계에서 사용합니다.

4단계: Lambda 함수 만들기

이 항목의 2단계: Lambda 함수 배포 패키지 만들기 에서 생성한 .zip 배포 패키지를 업로드하여 Lambda 함수를 생성합니다.

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024
Copy

이 항목의 3단계: Lambda용 AWS IAM 역할 만들기 에서 기록한 역할 ARN을 --role 에 지정합니다.

출력에서 새 함수에 대한 ARN을 기록합니다. 이 기록은 다음 단계에서 사용합니다.

5단계: Lambda 함수 호출 허용

함수를 호출하기 위해 필요한 권한을 S3에 부여합니다.

이 항목의 4단계: Lambda 함수 만들기 에서 기록한 함수 ARN을 --source-arn 에 지정합니다.

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser
Copy

6단계: Lambda 알림 이벤트 등록

Amazon S3 이벤트 알림 지침을 완료하여 Lambda 알림 이벤트를 등록합니다. 이 항목의 4단계: Lambda 함수 만들기 에서 기록한 함수 ARN을 입력 필드에 지정합니다.