Automação do Snowpipe para Google Cloud Storage¶
Este tópico fornece instruções para acionar cargas de dados do Snowpipe automaticamente mensagens do Google Cloud Pub/Sub para eventos do Google Cloud Storage (GCS).
Note que apenas os eventos OBJECT_FINALIZE
acionam o Snowpipe para carregar arquivos. A Snowflake recomenda que você envie apenas eventos suportados para o Snowpipe para reduzir custos, ruído de eventos e latência.
Neste tópico:
Suporte para a plataforma de nuvem¶
O acionamento de cargas de dados automatizadas do Snowpipe usando mensagens de evento GCS Pub/Sub conta com o suporte por contas Snowflake hospedadas em todas as plataformas de nuvem compatíveis.
Configuração de acesso seguro ao armazenamento em nuvem¶
Nota
Se você já tiver configurado o acesso seguro ao bucket GCS que armazena seus arquivos de dados, pode pular esta seção.
Esta seção descreve como configurar um objeto de integração de armazenamento Snowflake para delegar a responsabilidade pela autenticação do armazenamento em nuvem a uma entidade de gerenciamento de identidade e acesso (IAM) do Snowflake.
Esta seção descreve como usar as integrações de armazenamento para permitir que o Snowflake leia e grave dados em um bucket Google Cloud Storage referenciado em um estágio externo (ou seja, o armazenamento em nuvem). As integrações são objetos Snowflake nomeados e de primeira classe que evitam a necessidade de passar credenciais explícitas de provedores de nuvens, tais como chaves secretas ou tokens de acesso; em vez disso, os objetos de integração fazem referência a uma conta do serviço de armazenamento em nuvem. Um administrador em sua organização concede as permissões da conta de serviço na conta de armazenamento em nuvem.
Os administradores também podem restringir os usuários a um conjunto específico de buckets de armazenamento em nuvem (e caminhos opcionais) acessados por estágios externos que utilizam a integração.
Nota
Completar as instruções nesta seção requer acesso ao seu projeto de armazenamento em nuvem como um editor de projeto. Se você não for um editor de projeto, peça a seu administrador do armazenamento em nuvem para realizar estas tarefas.
O diagrama a seguir mostra o fluxo de integração para um estágio do armazenamento em nuvem:
Um estágio externo (ou seja, o armazenamento em nuvem) faz referência a um objeto de integração de armazenamento em sua definição.
O Snowflake associa automaticamente a integração do armazenamento com uma conta de serviço de armazenamento em nuvem criada para sua conta. O Snowflake cria uma única conta de serviço que é referenciada por todas as integrações de armazenamento GCS em sua conta Snowflake.
Um editor de projeto para seu projeto de armazenamento em nuvem concede permissões à conta de serviço para acessar o bucket referenciado na definição do estágio. Observe que muitos objetos de preparação externos podem fazer referência a diferentes buckets e caminhos e usar a mesma integração para autenticação.
Quando um usuário carrega ou descarrega dados de ou para um estágio, o Snowflake verifica as permissões concedidas à conta de serviço no bucket antes de permitir ou negar o acesso.
Nesta seção:
Etapa 1: Criar uma integração de armazenamento em nuvem no Snowflake¶
Crie uma integração usando o comando CREATE STORAGE INTEGRATION. Uma integração é um objeto Snowflake que delega a responsabilidade pela autenticação do armazenamento externo em nuvem a uma entidade gerada pelo Snowflake (ou seja, uma conta de serviço de armazenamento em nuvem). Para acessar os buckets de armazenamento em nuvem, o Snowflake cria uma conta de serviço que pode receber permissões para acessar os buckets que armazenam seus arquivos de dados.
Uma única integração de armazenamento pode oferecer suporte a vários estágios externos (ou seja, GCS). A URL na definição do estágio deve estar alinhada com os buckets GCS (e caminhos opcionais) especificados para o parâmetro STORAGE_ALLOWED_LOCATIONS.
Nota
Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
Onde:
integration_name
é o nome da nova integração.bucket
é o nome de um bucket de armazenamento em nuvem que armazena seus arquivos de dados (por exemplo,mybucket
). O parâmetro STORAGE_ALLOWED_LOCATIONS obrigatório e o parâmetro STORAGE_BLOCKED_LOCATIONS opcional restringem ou bloqueiam o acesso a estes buckets, respectivamente, quando os estágios que fazem referência a esta integração são criados ou modificados.path
é um caminho opcional que pode ser usado para proporcionar um controle granular sobre objetos no bucket.
O exemplo a seguir cria uma integração que limita explicitamente os estágios externos que utilizam a integração para referenciar um de dois buckets ou caminhos. Em uma etapa posterior, criaremos um estágio externo que referencia um desses buckets e caminhos.
Os estágios externos adicionais que também utilizam esta integração podem fazer referência aos buckets e caminhos permitidos:
CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'GCS' ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/') STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
Etapa 2: Recuperar a conta de serviço de armazenamento em nuvem para sua conta Snowflake¶
Execute o comando DESCRIBE INTEGRATION para recuperar a ID para a conta de serviço de armazenamento em nuvem que foi criada automaticamente para sua conta Snowflake:
DESC STORAGE INTEGRATION <integration_name>;
Onde:
integration_name
é o nome da integração que você criou em Etapa 1: Criar uma integração de armazenamento em nuvem no Snowflake (neste tópico).
Por exemplo:
DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
A propriedade STORAGE_GCP_SERVICE_ACCOUNT na saída mostra a conta do serviço de armazenamento em nuvem criada para sua conta Snowflake (por exemplo, service-account-id@project1-123456.iam.gserviceaccount.com
). Fornecemos uma única conta de serviço de armazenamento em nuvem para toda a sua conta Snowflake. Todas as integrações de armazenamento em nuvem utilizam essa conta de serviço.
Etapa 3: Conceder as permissões da conta de serviço para acessar os objetos do bucket¶
As seguintes instruções passo a passo descrevem como configurar as permissões de acesso IAM ao Snowflake em seu Console Google Cloud Platform para que você possa usar um bucket de armazenamento em nuvem para carregar e descarregar dados:
Criação de uma função IAM personalizada¶
Crie uma função personalizada que tenha as permissões necessárias para acessar o bucket e obter objetos.
Acesse o Console Google Cloud Platform como editor de projeto.
No painel inicial, escolha IAM & admin » Roles.
Clique em Create Role.
Digite um nome e uma descrição para a função personalizada.
Clique em Add Permissions.
Filtre a lista de permissões e adicione o seguinte da lista:
Ação(ões)
Permissões necessárias
Somente carregamento de dados
storage.buckets.get
storage.objects.get
storage.objects.list
Carregamento de dados com opção de limpeza, executando o comando REMOVE no estágio
storage.buckets.get
storage.objects.delete
storage.objects.get
storage.objects.list
Carregamento e descarregamento de dados
storage.buckets.get
(para calcular custos de transferência de dados)storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.list
Apenas descarregamento de dados
storage.buckets.get
storage.objects.create
storage.objects.delete
storage.objects.list
Clique em Create.
Atribuição da função personalizada à conta de serviço de armazenamento em nuvem¶
Acesse o Console Google Cloud Platform como editor de projeto.
No painel inicial, escolha Cloud Storage » Browser:
Selecione um bucket a ser configurado para acesso.
Clique em SHOW INFO PANEL no canto superior direito. O painel de informações do bucket é exibido.
Clique no botão ADD PRINCIPAL.
No campo New principals, procure o nome da conta de serviço a partir da saída DESCRIBE INTEGRATION da Etapa 2: Recuperar a conta de serviço de armazenamento em nuvem para sua conta Snowflake (neste tópico).
No menu suspenso Select a role, selecione Custom »
<função>
, em que<função>
é a função de armazenamento em nuvem personalizada que você criou em Criação de uma função IAM personalizada (neste tópico).Clique no botão Save. O nome da conta de serviço é adicionado ao menu suspenso da função Storage Object Viewer no painel de informações.
Concessão de permissões de conta do serviço de armazenamento em nuvem para chaves criptográficas do Cloud Key Management Service¶
Nota
Esta etapa é necessária apenas se seu bucket GCS estiver criptografado usando uma chave armazenada no Google Cloud Key Management Service (Cloud KMS).
Acesse o Console Google Cloud Platform como editor de projeto.
No painel inicial, escolha Security » Cryptographic keys.
Selecione o chaveiro que é atribuído ao seu bucket GCS.
Clique em SHOW INFO PANEL no canto superior direito. O painel de informações do chaveiro é exibido.
Clique no botão ADD PRINCIPAL.
No campo New principals, procure o nome da conta de serviço a partir da saída DESCRIBE INTEGRATION da Etapa 2: Recuperar a conta de serviço de armazenamento em nuvem para sua conta Snowflake (neste tópico).
A partir do menu suspenso Select a role, selecione a função
Cloud KMS CrytoKey Encryptor/Decryptor
.Clique no botão Save. O nome da conta de serviço é adicionado ao menu suspenso da função Cloud KMS CrytoKey Encryptor/Decryptor no painel de informações.
Configuração da automação usando GCS Pub/Sub¶
Pré-requisitos¶
As instruções neste tópico consideram que os seguintes itens foram criados e configurados:
- Conta GCP:
Tópico Pub/Sub que recebe mensagens de eventos do bucket GCS. Para obter mais informações, consulte Criação de tópico Pub/Sub (neste tópico).
Assinatura que recebe mensagens de eventos do tópico Pub/Sub. Para obter mais informações, consulte Criação de assinatura Pub/Sub (neste tópico).
Para instruções, consulte a documentação Pub/Sub.
- Snowflake:
Tabela de destino no banco de dados do Snowflake onde você deseja carregar os dados.
Criação de tópico Pub/Sub¶
Crie um tópico Pub/Sub usando Cloud Shell ou Cloud SDK.
Execute o seguinte comando para criar o tópico e habilitá-lo a escutar a atividade no bucket GCS especificado:
$ gsutil notification create -t <topic> -f json gs://<bucket-name> -e OBJECT_FINALIZE
Onde:
<tópico>
é o nome para o tópico.<nome-bucket>
é o nome do seu bucket GCS.
Se o tópico já existir, o comando o utilizará; caso contrário, o comando criará um novo tópico.
Para obter mais informações, consulte Uso de notificações Pub/Sub para armazenamento em nuvem na documentação Pub/Sub.
Criação de assinatura Pub/Sub¶
Crie uma assinatura com entrega por Pull para o tópico Pub/Sub usando a ferramenta de linha de comando gcloud
ou a API Cloud Pub/Sub. Para obter instruções, consulte Gerenciamento de tópicos e assinaturas na documentação do Pub/Sub.
Nota
Somente as assinaturas Pub/Sub que utilizam a entrega por Pull padrão têm suporte com o Snowflake. A entrega por Push não tem suporte.
Recuperação da ID da assinatura Pub/Sub¶
A ID de assinatura de tópico Pub/Sub é usada nestas instruções para permitir o acesso do Snowflake às mensagens de eventos.
Acesse o Console Google Cloud Platform como editor de projeto.
No painel inicial, escolha Big Data » Pub/Sub » Subscriptions.
Copiar o ID na coluna Subscription ID para a assinatura do tópico.
Etapa 1: Criar uma integração de notificação no Snowflake¶
Crie uma integração de notificação usando o comando CREATE NOTIFICATION INTEGRATION. A integração da notificação faz referência à sua assinatura Pub/Sub. O Snowflake associa a integração da notificação com uma conta de serviço GCS criada para sua conta. O Snowflake cria uma única conta de serviço que é referenciada por todas as integrações de notificação GCS em sua conta Snowflake.
Nota
Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.
A conta de serviço GCS para integrações de notificação é diferente da conta de serviço criada para integrações de armazenamento.
Uma única integração de notificação oferece suporte para uma única assinatura do Google Cloud Pub/Sub. A referência a mesma assinatura Pub/Sub em múltiplas integrações de notificação pode resultar na ausência de dados nas tabelas de destino porque as notificações de eventos são divididas entre as integrações de notificação. Portanto, a criação do canal será bloqueada se um canal fizer referência à mesma assinatura Pub/Sub que um canal existente.
CREATE NOTIFICATION INTEGRATION <integration_name>
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
Onde:
integration_name
é o nome da nova integração.subscription_id
é o nome da assinatura que você registrou em Recuperação da ID da assinatura Pub/Sub.
Por exemplo:
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
Etapa 2: Conceder acesso do Snowflake à assinatura Pub/Sub¶
Execute o comando DESCRIBE INTEGRATION para recuperar a ID da conta de serviço Snowflake:
DESC NOTIFICATION INTEGRATION <integration_name>;
Onde:
integration_name
é o nome da integração que você criou em Etapa 1: Criar uma integração de armazenamento no Snowflake.
Por exemplo:
DESC NOTIFICATION INTEGRATION my_notification_int;
Registre o nome da conta de serviço na coluna GCP_PUBSUB_SERVICE_ACCOUNT, que tem o seguinte formato:
<service_account>@<project_id>.iam.gserviceaccount.com
Acesse o Console Google Cloud Platform como editor de projeto.
No painel inicial, escolha Big Data » Pub/Sub » Subscriptions.
Selecione a assinatura a ser configurada para acesso.
Clique em SHOW INFO PANEL no canto superior direito. O painel de informações da assinatura é exibido.
Clique no botão ADD PRINCIPAL.
No campo New principals, procure pelo nome da conta de serviço que você registrou.
A partir do menu suspenso Select a role, selecione Pub/Sub Subscriber.
Clique no botão Save. O nome da conta de serviço é adicionado ao menu suspenso da função Pub/Sub Subscriber no painel de informações.
Navegue até a página Dashboard no Console Cloud e selecione seu projeto na lista suspensa.
Clique no botão ADD PEOPLE TO THIS PROJECT.
Adicione o nome da conta de serviço que você registrou.
A partir do menu suspenso Select a role, selecione Monitoring Viewer.
Clique no botão Save. O nome da conta de serviço é adicionado à função Monitoring Viewer.
Etapa 3: criar um estágio (se necessário)¶
Crie um estágio externo que faça referência ao seu bucket GCS usando o comando CREATE STAGE. O Snowflake lê seus arquivos de dados preparados nos metadados da tabela externa. Você também pode usar um estágio externo já existente.
Nota
Para configurar o acesso seguro ao local de armazenamento na nuvem, consulte Configuração de acesso seguro ao armazenamento em nuvem (neste tópico).
Para fazer referência a uma integração de armazenamento na instrução CREATE STAGE, a função deve ter o privilégio USAGE para o objeto de integração de armazenamento.
O exemplo a seguir cria um estágio chamado mystage
no esquema ativo para a sessão do usuário. O URL de armazenamento em nuvem inclui o caminho files
. O estágio faz referência a uma integração de armazenamento chamada my_storage_int
.
USE SCHEMA mydb.public; CREATE STAGE mystage URL='gcs://load/files/' STORAGE_INTEGRATION = my_storage_int;
Etapa 4: criar um canal com ingestão automática habilitada¶
Crie um canal usando o comando CREATE PIPE. O canal define a instrução COPY INTO <tabela> usada pelo Snowpipe para carregar os dados da fila de ingestão na tabela de destino.
CREATE PIPE <pipe_name>
AUTO_INGEST = true
INTEGRATION = '<notification_integration_name>'
AS
<copy_statement>;
Onde:
<nome_do_canal>
Identificador para o canal; deve ser único para o esquema no qual o canal é criado.
O identificador deve começar com um caractere alfabético e não pode conter espaços ou caracteres especiais a menos que toda a cadeia de caracteres do identificador esteja entre aspas duplas (por exemplo,
"My object"
). Os identificadores delimitados por aspas duplas também diferenciam letras maiúsculas de minúsculas.INTEGRATION = '<nome_integração_notificação>'
Nome da integração de notificação usada para atualizar automaticamente os metadados da tabela de diretório usando notificações Google Cloud Pub/Sub. Uma integração de notificação é um objeto Snowflake que fornece uma interface entre o Snowflake e serviços de enfileiramento de mensagens de terceiros.
copy_statement
Instrução COPY INTO <tabela> usada para carregar dados de arquivos em fila em uma tabela do Snowflake. Esta instrução serve como texto/definição para o canal e é exibida na saída SHOW PIPES.
Por exemplo, crie um canal no esquema snowpipe_db.public
que carrega dados de arquivos preparados em um estágio externo (GCS) chamado mystage
em uma tabela de destino chamada mytable
:
CREATE PIPE snowpipe_db.public.mypipe
AUTO_INGEST = true
INTEGRATION = 'MY_NOTIFICATION_INT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage/path2;
O parâmetro INTEGRATION faz referência à integração de notificação my_notification_int
que você criou na Etapa 1: Criar uma integração de notificação no Snowflake. O nome da integração deve ser fornecido em letras maiúsculas.
Importante
Verifique se a referência do local de armazenamento na instrução COPY INTO <tabela> instrução não se sobrepõe à referência em canais existentes na conta. Caso contrário, vários canais poderiam carregar o mesmo conjunto de arquivos de dados nas tabelas de destino. Por exemplo, esta situação pode ocorrer quando múltiplas definições de canais fazem referência ao mesmo local de armazenamento com diferentes níveis de granularidade, como <local_de_armazenamento>/path1/
e <local_de_armazenamento>/path1/path2/
. Neste exemplo, se os arquivos fossem preparados em <local_armazenamento>/path1/path2/
, ambos os canais carregariam uma cópia dos arquivos.
Veja as instruções COPY INTO <tabela> nas definições de todos os canais da conta executando SHOW PIPES ou consultando a exibição PIPES em Account Usage ou a exibição PIPES no Information Schema.
O Snowpipe com ingestão automática está configurado!
Quando novos arquivos de dados são adicionados ao bucket GCS, a mensagem de evento informa o Snowpipe para carregá-los na tabela de destino definida no canal.
Etapa 5: carregar arquivos históricos¶
Para carregar qualquer lista de pendências de arquivos de dados que existiam no estágio externo antes que as mensagens do Pub/Sub fossem configuradas, execute uma instrução ALTERPIPE … REFRESH.
Etapa 6: excluir arquivos preparados¶
Exclua os arquivos preparados depois de carregar os dados com sucesso e não precisar mais dos arquivos. Para obter instruções, consulte Exclusão de arquivos preparados depois que o Snowpipe carrega os dados.
Saída SYSTEM$PIPE_STATUS¶
A função SYSTEM$PIPE_STATUS recupera uma representação JSON do status atual de um canal.
Para canais com AUTO_INGEST definido como TRUE, a função retorna um objeto JSON contendo os seguintes pares nome/valor (se aplicável ao status atual do canal):
{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
Para descrições dos valores de saída, consulte o tópico de referência para a função SQL.