Opção 2: Automatização do Snowpipe com AWS Lambda¶
AWS Lambda é um serviço de computação que funciona quando acionado por um evento e executa o código que foi carregado no sistema. Você pode adaptar o código Python de amostra fornecido neste tópico e criar uma função Lambda que chama a API REST Snowpipe para carregar dados a partir de seu estágio externo (ou seja, bucket S3; contêineres Azure não são suportados). A função é implantada em sua conta AWS, onde ela é hospedada. Eventos que você define no Lambda (por exemplo, quando os arquivos em seu bucket S3 são atualizados) invocam a função Lambda e executam o código Python.
Este tópico descreve as etapas necessárias para configurar uma função Lambda para carregar dados em microlotes de forma contínua e automática usando o Snowpipe.
Nota
Este tópico pressupõe que você tenha configurado o Snowpipe usando as instruções em Preparação para carregamento de dados usando a API REST Snowpipe.
Neste tópico:
Etapa 1: escrever código Python invocando a API REST Snowpipe¶
Código Python de amostra
from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend
import os
with open("./rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
host='<account_identifier>.snowflakecomputing.com',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = []
staged_file_list.append(StagedFile(key, None))
print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)
Nota
O código de amostra não leva em conta o tratamento de erros. Por exemplo, ele não tenta novamente chamadas ingest_manager
com falha.
Antes de usar o código de amostra, faça as seguintes alterações:
Atualize o parâmetro de segurança:
private_key=""" / -----BEGIN RSA PRIVATE KEY----- / ... / -----END RSA PRIVATE KEY----- """
Especifica o conteúdo do arquivo de chave privada que você criou em Uso da autenticação de par de chaves e rodízio de chaves (em Preparação para carregamento de dados usando a API REST Snowpipe).
Especifique a senha para decodificar o arquivo de chave privada usando a variável de ambiente
PRIVATE_KEY_PASSPHRASE
:Linux ou macOS:
export PRIVATE_KEY_PASSPHRASE='<passphrase>'
Windows:
set PRIVATE_KEY_PASSPHRASE='<passphrase>'
Atualize os parâmetros da sessão:
account='<identificador_de_conta>'
Especifique o identificador único para sua conta (fornecido pela Snowflake). Consulte a descrição
host
.host='<identificador_de_conta>.snowflakecomputing.com'
Especifique o nome de host exclusivo de sua conta Snowflake.
O formato preferido do identificador de conta é o seguinte:
organization_name-account_name
Nomes de sua conta e organização no Snowflake. Para obter mais detalhes, consulte Formato 1 (preferido): Nome da conta em sua organização.
Alternativamente, especifique seu localizador de contas, juntamente com a região e a plataforma de nuvem onde a conta é hospedada, se necessário. Para obter mais detalhes, consulte Formato 2 (legado): Localizador de conta em uma região.
user='<nome_de_login_de_usuário>'
Especifica o nome de login do usuário do Snowflake que executará o código do Snowpipe.
pipe='<nome_db>.<nome_de_esquema>.<nome_de_canal>'
Especifica o nome totalmente qualificado do canal a ser usado para carregar os dados, na forma de
<nome_db>.<nome_de_esquema>.<nome_de_canal>
.
Especifique o caminho para seus arquivos a serem importados na lista de objetos de arquivo.
staged_file_list = []
O caminho que você especificar deve ser relativo ao estágio onde os arquivos estão localizados. Inclua o nome completo de cada arquivo, incluindo a extensão do arquivo. Por exemplo, um arquivo CSV que é comprimido com gzip pode ter a extensão
.csv.gz
.
Salve o arquivo em um local conveniente.
As demais instruções neste tópico assumem que o nome do arquivo seja SnowpipeLamdbaCode.py
.
Etapa 2: criar um pacote de implantação de função Lambda¶
Conclua as instruções a seguir para criar um ambiente de tempo de execução Python para Lambda e adicione o código do Snowpipe que você adaptou em Etapa 1: Escrever código Python invocando a API REST Snowpipe (neste tópico). Para obter mais informações sobre estas etapas, consulte a documentação do pacote de implantação do AWS Lambda (consulte as instruções para Python).
Importante
Os scripts nas etapas a seguir são um exemplo representativo e supõem que você esteja criando uma instância do AWS EC2 Linux com base em uma instância de máquina da Amazon (AMI) que usa o gerenciador de pacotes YUM, que depende do RPM. Se você selecionar um AMI Linux baseado em Debian, atualize seus scripts.
Crie uma instância do AWS EC2 Linux completando as instruções AWS EC2. Essa instância fornecerá os recursos computacionais para executar o código do Snowpipe.
Copie o arquivo de código do Snowpipe para sua nova instância do AWS EC2 usando SCP (Secure Copy):
scp -i key.pem /<path>/SnowpipeLambdaCode.py ec2-user@<machine>.<region_id>.compute.amazonaws.com:~/SnowpipeLambdaCode.py
Onde:
O caminho
<>
é o caminho de seu arquivo localSnowpipeLambdaCode.py
.<máquina>.<id_de_região>
é o nome DNS da instância do EC2 (por exemplo,ec2-54-244-54-199.us-west-2.compute.amazonaws.com
).O nome do DNS é exibido na tela Instances no console do Amazon EC2.
Conecte-se à instância do EC2 usando SSH (Secure SHell):
ssh -i key.pem ec2-user@<machine>.<region_id>.compute.amazonaws.com
Instale o Python e bibliotecas relacionadas na instância do EC2:
sudo yum install -y gcc zlib zlib-devel openssl openssl-devel wget https://www.python.org/ftp/python/3.6.1/Python-3.6.1.tgz tar -xzvf Python-3.6.1.tgz cd Python-3.6.1 && ./configure && make sudo make install sudo /usr/local/bin/pip3 install virtualenv /usr/local/bin/virtualenv ~/shrink_venv source ~/shrink_venv/bin/activate pip install Pillow pip install boto3 pip install requests pip install snowflake-ingest
Crie o pacote de implantação de .zip (
Snowpipe.zip
):cd $VIRTUAL_ENV/lib/python3.6/site-packages zip -r9 ~/Snowpipe.zip . cd ~ zip -g Snowpipe.zip SnowpipeLambdaCode.py
Etapa 3: criar uma função AWS IAM para Lambda¶
Siga a documentação do AWS Lambda para criar uma função IAM para executar a função Lambda.
Registre o IAM Amazon Resource Name (ARN) para a função. Você o utilizará na próxima etapa.
Etapa 4: criar a função Lambda¶
Crie a função Lambda carregando o pacote de implantação .zip
que você criou em Etapa 2: Criar um pacote de implantação de função Lambda (neste tópico):
aws lambda create-function \ --region us-west-2 \ --function-name IngestFile \ --zip-file fileb://~/Snowpipe.zip \ --role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \ --handler SnowpipeLambdaCode.handler \ --runtime python3.6 \ --profile adminuser \ --timeout 10 \ --memory-size 1024
Para --role
, especifique a função ARN que você registrou em Etapa 3: Criar uma função AWS IAM para Lambda (neste tópico).
Registre o ARN para a nova função a partir da saída. Você o utilizará na próxima etapa.
Etapa 5: permitir chamadas para a função Lambda¶
Conceda ao S3 as permissões necessárias para invocar sua função.
Para --source-arn
, especifique a função ARN que você registrou em Etapa 4: Criar a função Lambda (neste tópico).
aws lambda add-permission \ --function-name IngestFile \ --region us-west-2 \ --statement-id enable-ingest-calls \ --action "lambda:InvokeFunction" \ --principal s3.amazonaws.com \ --source-arn arn:aws:s3:::<SourceBucket> \ --source-account <aws_account_id> \ --profile adminuser
Etapa 6: registrar o evento de notificação Lambda¶
Registre um evento de notificação Lambda completando as instruções do Amazon S3 Event Notifications . No campo de entrada, especifique a função ARN que você registrou em Etapa 4: Criar a função Lambda (neste tópico).