Automação do Snowpipe para Google Cloud Storage

Este tópico fornece instruções para acionar cargas de dados do Snowpipe automaticamente mensagens do Google Cloud Pub/Sub para eventos do Google Cloud Storage (GCS).

Note que apenas os eventos OBJECT_FINALIZE acionam o Snowpipe para carregar arquivos. A Snowflake recomenda que você envie apenas eventos suportados para o Snowpipe para reduzir custos, ruído de eventos e latência.

Neste tópico:

Suporte para a plataforma de nuvem

O acionamento de cargas de dados automatizadas do Snowpipe usando mensagens de evento GCS Pub/Sub conta com o suporte por contas Snowflake hospedadas em todas as plataformas de nuvem compatíveis.

Configuração de acesso seguro ao armazenamento em nuvem

Nota

Se você já tiver configurado o acesso seguro ao bucket GCS que armazena seus arquivos de dados, pode pular esta seção.

Esta seção descreve como configurar um objeto de integração de armazenamento Snowflake para delegar a responsabilidade pela autenticação do armazenamento em nuvem a uma entidade de gerenciamento de identidade e acesso (IAM) do Snowflake.

Esta seção descreve como usar as integrações de armazenamento para permitir que o Snowflake leia e grave dados em um bucket Google Cloud Storage referenciado em um estágio externo (ou seja, o armazenamento em nuvem). As integrações são objetos Snowflake nomeados e de primeira classe que evitam a necessidade de passar credenciais explícitas de provedores de nuvens, tais como chaves secretas ou tokens de acesso; em vez disso, os objetos de integração fazem referência a uma conta do serviço de armazenamento em nuvem. Um administrador em sua organização concede as permissões da conta de serviço na conta de armazenamento em nuvem.

Os administradores também podem restringir os usuários a um conjunto específico de buckets de armazenamento em nuvem (e caminhos opcionais) acessados por estágios externos que utilizam a integração.

Nota

Completar as instruções nesta seção requer acesso ao seu projeto de armazenamento em nuvem como um editor de projeto. Se você não for um editor de projeto, peça a seu administrador do armazenamento em nuvem para realizar estas tarefas.

O diagrama a seguir mostra o fluxo de integração para um estágio do armazenamento em nuvem:

Fluxo de integração do estágio do Google Cloud Storage
  1. Um estágio externo (ou seja, o armazenamento em nuvem) faz referência a um objeto de integração de armazenamento em sua definição.

  2. O Snowflake associa automaticamente a integração do armazenamento com uma conta de serviço de armazenamento em nuvem criada para sua conta. O Snowflake cria uma única conta de serviço que é referenciada por todas as integrações de armazenamento GCS em sua conta Snowflake.

  3. Um editor de projeto para seu projeto de armazenamento em nuvem concede permissões à conta de serviço para acessar o bucket referenciado na definição do estágio. Observe que muitos objetos de preparação externos podem fazer referência a diferentes buckets e caminhos e usar a mesma integração para autenticação.

Quando um usuário carrega ou descarrega dados de ou para um estágio, o Snowflake verifica as permissões concedidas à conta de serviço no bucket antes de permitir ou negar o acesso.

Nesta seção:

Etapa 1: Criar uma integração de armazenamento em nuvem no Snowflake

Crie uma integração usando o comando CREATE STORAGE INTEGRATION. Uma integração é um objeto Snowflake que delega a responsabilidade pela autenticação do armazenamento externo em nuvem a uma entidade gerada pelo Snowflake (ou seja, uma conta de serviço de armazenamento em nuvem). Para acessar os buckets de armazenamento em nuvem, o Snowflake cria uma conta de serviço que pode receber permissões para acessar os buckets que armazenam seus arquivos de dados.

Uma única integração de armazenamento pode oferecer suporte a vários estágios externos (ou seja, GCS). A URL na definição do estágio deve estar alinhada com os buckets GCS (e caminhos opcionais) especificados para o parâmetro STORAGE_ALLOWED_LOCATIONS.

Nota

Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
Copy

Onde:

  • integration_name é o nome da nova integração.

  • bucket é o nome de um bucket de armazenamento em nuvem que armazena seus arquivos de dados (por exemplo, mybucket). O parâmetro STORAGE_ALLOWED_LOCATIONS obrigatório e o parâmetro STORAGE_BLOCKED_LOCATIONS opcional restringem ou bloqueiam o acesso a estes buckets, respectivamente, quando os estágios que fazem referência a esta integração são criados ou modificados.

  • path é um caminho opcional que pode ser usado para proporcionar um controle granular sobre objetos no bucket.

O exemplo a seguir cria uma integração que limita explicitamente os estágios externos que utilizam a integração para referenciar um de dois buckets ou caminhos. Em uma etapa posterior, criaremos um estágio externo que referencia um desses buckets e caminhos.

Os estágios externos adicionais que também utilizam esta integração podem fazer referência aos buckets e caminhos permitidos:

CREATE STORAGE INTEGRATION gcs_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/')
  STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
Copy

Etapa 2: Recuperar a conta de serviço de armazenamento em nuvem para sua conta Snowflake

Execute o comando DESCRIBE INTEGRATION para recuperar a ID para a conta de serviço de armazenamento em nuvem que foi criada automaticamente para sua conta Snowflake:

DESC STORAGE INTEGRATION <integration_name>;
Copy

Onde:

Por exemplo:

DESC STORAGE INTEGRATION gcs_int;

+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
| property                    | property_type | property_value                                                              | property_default |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------|
| ENABLED                     | Boolean       | true                                                                        | false            |
| STORAGE_ALLOWED_LOCATIONS   | List          | gcs://mybucket1/path1/,gcs://mybucket2/path2/                               | []               |
| STORAGE_BLOCKED_LOCATIONS   | List          | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/   | []               |
| STORAGE_GCP_SERVICE_ACCOUNT | String        | service-account-id@project1-123456.iam.gserviceaccount.com                  |                  |
+-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
Copy

A propriedade STORAGE_GCP_SERVICE_ACCOUNT na saída mostra a conta do serviço de armazenamento em nuvem criada para sua conta Snowflake (por exemplo, service-account-id@project1-123456.iam.gserviceaccount.com). Fornecemos uma única conta de serviço de armazenamento em nuvem para toda a sua conta Snowflake. Todas as integrações de armazenamento em nuvem utilizam essa conta de serviço.

Etapa 3: Conceder as permissões da conta de serviço para acessar os objetos do bucket

As seguintes instruções passo a passo descrevem como configurar as permissões de acesso IAM ao Snowflake em seu Console Google Cloud Platform para que você possa usar um bucket de armazenamento em nuvem para carregar e descarregar dados:

Criação de uma função IAM personalizada

Crie uma função personalizada que tenha as permissões necessárias para acessar o bucket e obter objetos.

  1. Acesse o Console Google Cloud Platform como editor de projeto.

  2. No painel inicial, escolha IAM & admin » Roles.

  3. Clique em Create Role.

  4. Digite um nome e uma descrição para a função personalizada.

  5. Clique em Add Permissions.

  6. Filtre a lista de permissões e adicione o seguinte da lista:

    Ação(ões)

    Permissões necessárias

    Somente carregamento de dados

    • storage.buckets.get

    • storage.objects.get

    • storage.objects.list

    Carregamento de dados com opção de limpeza, executando o comando REMOVE no estágio

    • storage.buckets.get

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    Carregamento e descarregamento de dados

    • storage.buckets.get (para calcular custos de transferência de dados)

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.get

    • storage.objects.list

    Apenas descarregamento de dados

    • storage.buckets.get

    • storage.objects.create

    • storage.objects.delete

    • storage.objects.list

  7. Clique em Create.

Atribuição da função personalizada à conta de serviço de armazenamento em nuvem

  1. Acesse o Console Google Cloud Platform como editor de projeto.

  2. No painel inicial, escolha Cloud Storage » Browser:

    Lista de buckets no console do Google Cloud Platform
  3. Selecione um bucket a ser configurado para acesso.

  4. Clique em SHOW INFO PANEL no canto superior direito. O painel de informações do bucket é exibido.

  5. Clique no botão ADD PRINCIPAL.

  6. No campo New principals, procure o nome da conta de serviço a partir da saída DESCRIBE INTEGRATION da Etapa 2: Recuperar a conta de serviço de armazenamento em nuvem para sua conta Snowflake (neste tópico).

    Painel de informações do bucket no console do Google Cloud Platform
  7. No menu suspenso Select a role, selecione Custom » <função>, em que <função> é a função de armazenamento em nuvem personalizada que você criou em Criação de uma função IAM personalizada (neste tópico).

  8. Clique no botão Save. O nome da conta de serviço é adicionado ao menu suspenso da função Storage Object Viewer no painel de informações.

    Lista de funções do visualizador de objetos de armazenamento no console do Google Cloud Platform

Concessão de permissões de conta do serviço de armazenamento em nuvem para chaves criptográficas do Cloud Key Management Service

Nota

Esta etapa é necessária apenas se seu bucket GCS estiver criptografado usando uma chave armazenada no Google Cloud Key Management Service (Cloud KMS).

  1. Acesse o Console Google Cloud Platform como editor de projeto.

  2. No painel inicial, escolha Security » Cryptographic keys.

  3. Selecione o chaveiro que é atribuído ao seu bucket GCS.

  4. Clique em SHOW INFO PANEL no canto superior direito. O painel de informações do chaveiro é exibido.

  5. Clique no botão ADD PRINCIPAL.

  6. No campo New principals, procure o nome da conta de serviço a partir da saída DESCRIBE INTEGRATION da Etapa 2: Recuperar a conta de serviço de armazenamento em nuvem para sua conta Snowflake (neste tópico).

  7. A partir do menu suspenso Select a role, selecione a função Cloud KMS CrytoKey Encryptor/Decryptor.

  8. Clique no botão Save. O nome da conta de serviço é adicionado ao menu suspenso da função Cloud KMS CrytoKey Encryptor/Decryptor no painel de informações.

Configuração da automação usando GCS Pub/Sub

Pré-requisitos

As instruções neste tópico consideram que os seguintes itens foram criados e configurados:

Conta GCP:
  • Tópico Pub/Sub que recebe mensagens de eventos do bucket GCS. Para obter mais informações, consulte Criação de tópico Pub/Sub (neste tópico).

  • Assinatura que recebe mensagens de eventos do tópico Pub/Sub. Para obter mais informações, consulte Criação de assinatura Pub/Sub (neste tópico).

Para instruções, consulte a documentação Pub/Sub.

Snowflake:
  • Tabela de destino no banco de dados do Snowflake onde você deseja carregar os dados.

Criação de tópico Pub/Sub

Crie um tópico Pub/Sub usando Cloud Shell ou Cloud SDK.

Execute o seguinte comando para criar o tópico e habilitá-lo a escutar a atividade no bucket GCS especificado:

$ gsutil notification create -t <topic> -f json gs://<bucket-name> -e OBJECT_FINALIZE
Copy

Onde:

  • <tópico> é o nome para o tópico.

  • <nome-bucket> é o nome do seu bucket GCS.

Se o tópico já existir, o comando o utilizará; caso contrário, o comando criará um novo tópico.

Para obter mais informações, consulte Uso de notificações Pub/Sub para armazenamento em nuvem na documentação Pub/Sub.

Criação de assinatura Pub/Sub

Crie uma assinatura com entrega por Pull para o tópico Pub/Sub usando a ferramenta de linha de comando gcloud ou a API Cloud Pub/Sub. Para obter instruções, consulte Gerenciamento de tópicos e assinaturas na documentação do Pub/Sub.

Nota

  • Somente as assinaturas Pub/Sub que utilizam a entrega por Pull padrão têm suporte com o Snowflake. A entrega por Push não tem suporte.

Recuperação da ID da assinatura Pub/Sub

A ID de assinatura de tópico Pub/Sub é usada nestas instruções para permitir o acesso do Snowflake às mensagens de eventos.

  1. Acesse o Console Google Cloud Platform como editor de projeto.

  2. No painel inicial, escolha Big Data » Pub/Sub » Subscriptions.

  3. Copiar o ID na coluna Subscription ID para a assinatura do tópico.

Etapa 1: Criar uma integração de notificação no Snowflake

Crie uma integração de notificação usando o comando CREATE NOTIFICATION INTEGRATION. A integração da notificação faz referência à sua assinatura Pub/Sub. O Snowflake associa a integração da notificação com uma conta de serviço GCS criada para sua conta. O Snowflake cria uma única conta de serviço que é referenciada por todas as integrações de notificação GCS em sua conta Snowflake.

Nota

  • Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.

  • A conta de serviço GCS para integrações de notificação é diferente da conta de serviço criada para integrações de armazenamento.

  • Uma única integração de notificação oferece suporte para uma única assinatura do Google Cloud Pub/Sub. A referência a mesma assinatura Pub/Sub em múltiplas integrações de notificação pode resultar na ausência de dados nas tabelas de destino porque as notificações de eventos são divididas entre as integrações de notificação. Portanto, a criação do canal será bloqueada se um canal fizer referência à mesma assinatura Pub/Sub que um canal existente.

CREATE NOTIFICATION INTEGRATION <integration_name>
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
Copy

Onde:

Por exemplo:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
Copy

Etapa 2: Conceder acesso do Snowflake à assinatura Pub/Sub

  1. Execute o comando DESCRIBE INTEGRATION para recuperar a ID da conta de serviço Snowflake:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    Onde:

    Por exemplo:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. Registre o nome da conta de serviço na coluna GCP_PUBSUB_SERVICE_ACCOUNT, que tem o seguinte formato:

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. Acesse o Console Google Cloud Platform como editor de projeto.

  4. No painel inicial, escolha Big Data » Pub/Sub » Subscriptions.

  5. Selecione a assinatura a ser configurada para acesso.

  6. Clique em SHOW INFO PANEL no canto superior direito. O painel de informações da assinatura é exibido.

  7. Clique no botão ADD PRINCIPAL.

  8. No campo New principals, procure pelo nome da conta de serviço que você registrou.

  9. A partir do menu suspenso Select a role, selecione Pub/Sub Subscriber.

  10. Clique no botão Save. O nome da conta de serviço é adicionado ao menu suspenso da função Pub/Sub Subscriber no painel de informações.

  11. Navegue até a página Dashboard no Console Cloud e selecione seu projeto na lista suspensa.

  12. Clique no botão ADD PEOPLE TO THIS PROJECT.

  13. Adicione o nome da conta de serviço que você registrou.

  14. A partir do menu suspenso Select a role, selecione Monitoring Viewer.

  15. Clique no botão Save. O nome da conta de serviço é adicionado à função Monitoring Viewer.

Etapa 3: criar um estágio (se necessário)

Crie um estágio externo que faça referência ao seu bucket GCS usando o comando CREATE STAGE. O Snowflake lê seus arquivos de dados preparados nos metadados da tabela externa. Você também pode usar um estágio externo já existente.

Nota

  • Para configurar o acesso seguro ao local de armazenamento na nuvem, consulte Configuração de acesso seguro ao armazenamento em nuvem (neste tópico).

  • Para fazer referência a uma integração de armazenamento na instrução CREATE STAGE, a função deve ter o privilégio USAGE para o objeto de integração de armazenamento.

O exemplo a seguir cria um estágio chamado mystage no esquema ativo para a sessão do usuário. O URL de armazenamento em nuvem inclui o caminho files. O estágio faz referência a uma integração de armazenamento chamada my_storage_int.

USE SCHEMA mydb.public;

CREATE STAGE mystage
  URL='gcs://load/files/'
  STORAGE_INTEGRATION = my_storage_int;
Copy

Etapa 4: criar um canal com ingestão automática habilitada

Crie um canal usando o comando CREATE PIPE. O canal define a instrução COPY INTO <tabela> usada pelo Snowpipe para carregar os dados da fila de ingestão na tabela de destino.

CREATE PIPE <pipe_name>
  AUTO_INGEST = true
  INTEGRATION = '<notification_integration_name>'
  AS
<copy_statement>;
Copy

Onde:

<nome_do_canal>

Identificador para o canal; deve ser único para o esquema no qual o canal é criado.

O identificador deve começar com um caractere alfabético e não pode conter espaços ou caracteres especiais a menos que toda a cadeia de caracteres do identificador esteja entre aspas duplas (por exemplo, "My object"). Os identificadores delimitados por aspas duplas também diferenciam letras maiúsculas de minúsculas.

INTEGRATION = '<nome_integração_notificação>'

Nome da integração de notificação usada para atualizar automaticamente os metadados da tabela de diretório usando notificações Google Cloud Pub/Sub. Uma integração de notificação é um objeto Snowflake que fornece uma interface entre o Snowflake e serviços de enfileiramento de mensagens de terceiros.

copy_statement

Instrução COPY INTO <tabela> usada para carregar dados de arquivos em fila em uma tabela do Snowflake. Esta instrução serve como texto/definição para o canal e é exibida na saída SHOW PIPES.

Por exemplo, crie um canal no esquema snowpipe_db.public que carrega dados de arquivos preparados em um estágio externo (GCS) chamado mystage em uma tabela de destino chamada mytable:

CREATE PIPE snowpipe_db.public.mypipe
  AUTO_INGEST = true
  INTEGRATION = 'MY_NOTIFICATION_INT'
  AS
COPY INTO snowpipe_db.public.mytable
  FROM @snowpipe_db.public.mystage/path2;
Copy

O parâmetro INTEGRATION faz referência à integração de notificação my_notification_int que você criou na Etapa 1: Criar uma integração de notificação no Snowflake. O nome da integração deve ser fornecido em letras maiúsculas.

Importante

Verifique se a referência do local de armazenamento na instrução COPY INTO <tabela> instrução não se sobrepõe à referência em canais existentes na conta. Caso contrário, vários canais poderiam carregar o mesmo conjunto de arquivos de dados nas tabelas de destino. Por exemplo, esta situação pode ocorrer quando múltiplas definições de canais fazem referência ao mesmo local de armazenamento com diferentes níveis de granularidade, como <local_de_armazenamento>/path1/ e <local_de_armazenamento>/path1/path2/. Neste exemplo, se os arquivos fossem preparados em <local_armazenamento>/path1/path2/, ambos os canais carregariam uma cópia dos arquivos.

Veja as instruções COPY INTO <tabela> nas definições de todos os canais da conta executando SHOW PIPES ou consultando a exibição PIPES em Account Usage ou a exibição PIPES no Information Schema.

O Snowpipe com ingestão automática está configurado!

Quando novos arquivos de dados são adicionados ao bucket GCS, a mensagem de evento informa o Snowpipe para carregá-los na tabela de destino definida no canal.

Etapa 5: carregar arquivos históricos

Para carregar qualquer lista de pendências de arquivos de dados que existiam no estágio externo antes que as mensagens do Pub/Sub fossem configuradas, execute uma instrução ALTERPIPE … REFRESH.

Etapa 6: excluir arquivos preparados

Exclua os arquivos preparados depois de carregar os dados com sucesso e não precisar mais dos arquivos. Para obter instruções, consulte Exclusão de arquivos preparados depois que o Snowpipe carrega os dados.

Saída SYSTEM$PIPE_STATUS

A função SYSTEM$PIPE_STATUS recupera uma representação JSON do status atual de um canal.

Para canais com AUTO_INGEST definido como TRUE, a função retorna um objeto JSON contendo os seguintes pares nome/valor (se aplicável ao status atual do canal):

{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
Copy

Para descrições dos valores de saída, consulte o tópico de referência para a função SQL.