Opção 2: Automatização do Snowpipe com AWS Lambda

AWS Lambda é um serviço de computação que funciona quando acionado por um evento e executa o código que foi carregado no sistema. Você pode adaptar o código Python de amostra fornecido neste tópico e criar uma função Lambda que chama a API REST Snowpipe para carregar dados a partir de seu estágio externo (ou seja, bucket S3; contêineres Azure não são suportados). A função é implantada em sua conta AWS, onde ela é hospedada. Eventos que você define no Lambda (por exemplo, quando os arquivos em seu bucket S3 são atualizados) invocam a função Lambda e executam o código Python.

Este tópico descreve as etapas necessárias para configurar uma função Lambda para carregar dados em microlotes de forma contínua e automática usando o Snowpipe.

Nota

Este tópico pressupõe que você tenha configurado o Snowpipe usando as instruções em Preparação para carregamento de dados usando a API REST Snowpipe.

Neste tópico:

Etapa 1: escrever código Python invocando a API REST Snowpipe

Código Python de amostra

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
                   host='<account_identifier>.snowflakecomputing.com',
                   user='<user_login_name>',
                   pipe='<db_name>.<schema_name>.<pipe_name>',
                   private_key=private_key_text)

def handler(event, context):
  for record in event['Records']:
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    print("Bucket: " + bucket + " Key: " + key)
    # List of files in the stage specified in the pipe definition
    # wrapped into a class
    staged_file_list = []
    staged_file_list.append(StagedFile(key, None))

    print('Pushing file list to ingest REST API')
    resp = ingest_manager.ingest_files(staged_file_list)
Copy

Nota

O código de amostra não leva em conta o tratamento de erros. Por exemplo, ele não tenta novamente chamadas ingest_manager com falha.

Antes de usar o código de amostra, faça as seguintes alterações:

  1. Atualize o parâmetro de segurança:

    private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """

    Especifica o conteúdo do arquivo de chave privada que você criou em Uso da autenticação de par de chaves e rodízio de chaves (em Preparação para carregamento de dados usando a API REST Snowpipe).

    Especifique a senha para decodificar o arquivo de chave privada usando a variável de ambiente PRIVATE_KEY_PASSPHRASE:

    • Linux ou macOS:

      export PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
    • Windows:

      set PRIVATE_KEY_PASSPHRASE='<passphrase>'
      
      Copy
  2. Atualize os parâmetros da sessão:

    account='<identificador_de_conta>'

    Especifique o identificador único para sua conta (fornecido pela Snowflake). Consulte a descrição host.

    host='<identificador_de_conta>.snowflakecomputing.com'

    Especifique o nome de host exclusivo de sua conta Snowflake.

    O formato preferido do identificador de conta é o seguinte:

    organization_name-account_name

    Nomes de sua conta e organização no Snowflake. Para obter mais detalhes, consulte Formato 1 (preferido): Nome da conta em sua organização.

    Alternativamente, especifique seu localizador de contas, juntamente com a região e a plataforma de nuvem onde a conta é hospedada, se necessário. Para obter mais detalhes, consulte Formato 2 (legado): Localizador de conta em uma região.

    user='<nome_de_login_de_usuário>'

    Especifica o nome de login do usuário do Snowflake que executará o código do Snowpipe.

    pipe='<nome_db>.<nome_de_esquema>.<nome_de_canal>'

    Especifica o nome totalmente qualificado do canal a ser usado para carregar os dados, na forma de <nome_db>.<nome_de_esquema>.<nome_de_canal>.

  3. Especifique o caminho para seus arquivos a serem importados na lista de objetos de arquivo.

    staged_file_list = []

    O caminho que você especificar deve ser relativo ao estágio onde os arquivos estão localizados. Inclua o nome completo de cada arquivo, incluindo a extensão do arquivo. Por exemplo, um arquivo CSV que é comprimido com gzip pode ter a extensão .csv.gz.

  4. Salve o arquivo em um local conveniente.

As demais instruções neste tópico assumem que o nome do arquivo seja SnowpipeLamdbaCode.py.

Etapa 2: criar um pacote de implantação de função Lambda

Conclua as instruções a seguir para criar um ambiente de tempo de execução Python para Lambda e adicione o código do Snowpipe que você adaptou em Etapa 1: Escrever código Python invocando a API REST Snowpipe (neste tópico). Para obter mais informações sobre estas etapas, consulte a documentação do pacote de implantação do AWS Lambda (consulte as instruções para Python).

Importante

Os scripts nas etapas a seguir são um exemplo representativo e supõem que você esteja criando uma instância do AWS EC2 Linux com base em uma instância de máquina da Amazon (AMI) que usa o gerenciador de pacotes YUM, que depende do RPM. Se você selecionar um AMI Linux baseado em Debian, atualize seus scripts.

  1. Crie uma instância do AWS EC2 Linux completando as instruções AWS EC2. Essa instância fornecerá os recursos computacionais para executar o código do Snowpipe.

  2. Copie o arquivo de código do Snowpipe para sua nova instância do AWS EC2 usando SCP (Secure Copy):

    scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
    
    Copy

    Onde:

    • O caminho <> é o caminho de seu arquivo local SnowpipeLambdaCode.py.

    • <máquina>.<id_de_região> é o nome DNS da instância do EC2 (por exemplo, ec2-54-244-54-199.us-west-2.compute.amazonaws.com).

      O nome do DNS é exibido na tela Instances no console do Amazon EC2.

  3. Conecte-se à instância do EC2 usando SSH (Secure SHell):

    ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
    
    Copy
  4. Instale o Python e bibliotecas relacionadas na instância do EC2:

    sudo yum install -y gcc zlib zlib-devel openssl openssl-devel
    
    wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz
    
    tar -xzvf Python-3.6.1.tgz
    
    cd Python-3.6.1 && ./configure && make
    
    sudo make install
    
    sudo /usr/local/bin/pip3 install virtualenv
    
    /usr/local/bin/virtualenv ~/shrink_venv
    
    source ~/shrink_venv/bin/activate
    
    pip install Pillow
    
    pip install boto3
    
    pip install requests
    
    pip install snowflake-ingest
    
    Copy
  5. Crie o pacote de implantação de .zip (Snowpipe.zip):

    cd $VIRTUAL_ENV/lib/python3.6/site-packages
    
    zip -r9 ~/Snowpipe.zip .
    
    cd ~
    
    zip -g Snowpipe.zip SnowpipeLambdaCode.py
    
    Copy

Etapa 3: criar uma função AWS IAM para Lambda

Siga a documentação do AWS Lambda para criar uma função IAM para executar a função Lambda.

Registre o IAM Amazon Resource Name (ARN) para a função. Você o utilizará na próxima etapa.

Etapa 4: criar a função Lambda

Crie a função Lambda carregando o pacote de implantação .zip que você criou em Etapa 2: Criar um pacote de implantação de função Lambda (neste tópico):

aws lambda create-function \
--region us-west-2 \
--function-name IngestFile \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024
Copy

Para --role, especifique a função ARN que você registrou em Etapa 3: Criar uma função AWS IAM para Lambda (neste tópico).

Registre o ARN para a nova função a partir da saída. Você o utilizará na próxima etapa.

Etapa 5: permitir chamadas para a função Lambda

Conceda ao S3 as permissões necessárias para invocar sua função.

Para --source-arn, especifique a função ARN que você registrou em Etapa 4: Criar a função Lambda (neste tópico).

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser
Copy

Etapa 6: registrar o evento de notificação Lambda

Registre um evento de notificação Lambda completando as instruções do Amazon S3 Event Notifications . No campo de entrada, especifique a função ARN que você registrou em Etapa 4: Criar a função Lambda (neste tópico).