Habilitação de notificações de erro do Snowpipe para o Google Pub/Sub

Este tópico fornece instruções para enviar as notificações de erro do Snowpipe para o serviço Google Cloud Pub/Sub (Pub/Sub).

Este recurso pode enviar notificações de erro para os seguintes tipos de cargas:

  • Snowpipe com ingestão automática.

  • Chamadas para o ponto de extremidade insertFiles da API REST Snowpipe.

  • Carregamentos do Apache Kafka usando o conector Snowflake para Kafka somente com o método de ingestão Snowpipe.

Neste tópico:

Suporte para a plataforma de nuvem

Atualmente, esse recurso está limitado às contas Snowflake hospedadas no Google Cloud Platform (GCP). O Snowpipe pode carregar dados de arquivos em qualquer serviço de armazenamento em nuvem suportado; no entanto, notificações por push para o Pub/Sub só são suportadas em contas Snowflake hospedadas no GCP.

Notas

  • Snowflake garante a entrega de notificações de erro pelo menos uma vez (ou seja, são feitas várias tentativas de entrega de mensagens para garantir pelo menos uma tentativa bem sucedida, o que pode resultar em mensagens duplicadas).

  • Esse recurso é implementado utilizando o objeto de integração de notificação. Uma integração de notificação é um objeto Snowflake que fornece uma interface entre o Snowflake e serviços de enfileiramento de mensagens de terceiros. Uma única integração de notificação pode oferecer suporte a múltiplos canais.

Habilitação de notificações de erro

Etapa 1: Criar o tópico Pub/Sub

Crie um tópico Pub/Sub que possa receber mensagens de notificação de erro do Snowflake ou reutilizar um tópico existente. Você pode criar o tópico usando SDK Cloud Shell ou Cloud. Para obter mais informações, consulte Criação e uso de tópicos na documentação do Pub/Sub.

Por exemplo, execute o seguinte comando para criar um tópico vazio:

$ gsutil notification create -t <topic>
Copy

Se o tópico já existe, o comando o utiliza; caso contrário, é criado um novo tópico.

Etapa 2: Criar a assinatura Pub/Sub

Opcionalmente, crie uma assinatura para o tópico Pub/Sub para recuperar notificações de erro. Você pode criar uma assinatura com entrega por Pull usando a ferramenta de linha de comando gcloud ou a API Cloud Pub/Sub. Para obter instruções, consulte Gerenciamento de tópicos e assinaturas na documentação do Pub/Sub.

Etapa 3: Criar uma integração de notificação no Snowflake

Crie uma integração de notificação usando o comando CREATE NOTIFICATION INTEGRATION. A integração de notificação faz referência ao seu tópico Pub/Sub. O Snowflake associa a integração de notificação com uma conta de serviço Goodle Cloud Platform (GCP) criada para sua conta. O Snowflake cria uma única conta de serviço que é referenciada por todas as integrações de notificação GCP em sua conta Snowflake.

Nota

  • Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.

  • A conta de serviço GCP para integrações de notificação é diferente da conta de serviço criada para integrações de armazenamento.

CREATE NOTIFICATION INTEGRATION <integration_name>
  ENABLED = TRUE
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Copy

Onde:

  • integration_name é o nome da nova integração.

  • topic_id é o tópico Pub/Sub para o qual o Snowflake envia notificações de erro. Para obter mais informações, consulte Etapa 1: Criar o tópico Pub/Sub (neste tópico).

Por exemplo:

CREATE NOTIFICATION INTEGRATION my_notification_int
  TYPE = QUEUE
  DIRECTION = OUTBOUND
  NOTIFICATION_PROVIDER = GCP_PUBSUB
  ENABLED = true
  GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Copy

Etapa 4: Conceder acesso do Snowflake à assinatura Pub/Sub

  1. Execute o comando DESCRIBE INTEGRATION para recuperar a ID da conta de serviço Snowflake:

    DESC NOTIFICATION INTEGRATION <integration_name>;
    
    Copy

    Onde:

    • integration_name é o nome da integração que você criou em “Etapa 1: Criar uma integração de notificação no Snowflake”.

    Por exemplo:

    DESC NOTIFICATION INTEGRATION my_notification_int;
    
    Copy
  2. Registre o nome da conta de serviço na coluna GCP_PUBSUB_SERVICE_ACCOUNT, que tem o seguinte formato:

    <service_account>@<project_id>.iam.gserviceaccount.com
    
    Copy
  3. Acesse o Console Google Cloud Platform como editor de projeto.

  4. No painel inicial, escolha Big Data » Pub/Sub » Subscriptions.

  5. Selecione a assinatura a ser configurada para acesso.

  6. Clique em SHOW INFO PANEL no canto superior direito. O painel de informações da assinatura é exibido.

  7. No campo Add members, procure pelo nome da conta de serviço que você registrou.

  8. A partir do menu suspenso Select a role, selecione Pub/Sub Publisher.

  9. Clique no botão Add. O nome da conta de serviço é adicionado ao menu suspenso da função Pub/Sub Publisher no painel de informações.

Etapa 5: Habilitar notificações de erro em canais

Uma única integração de notificação pode ser compartilhada por vários canais. O corpo das mensagens de erro identifica o canal, o caminho e o estágio externo, o arquivo onde o erro se originou, entre outros detalhes.

Para permitir notificações de erro de um canal, especifique um valor de parâmetro ERROR_INTEGRATION.

Nota

A criação ou modificação de um canal que faz referência a uma integração de notificação requer uma função que tenha o privilégio USAGE sobre a integração de notificação. Além disso, a função deve ter ou o privilégio CREATE PIPE sobre o esquema ou o privilégio OWNERSHIP sobre o canal, respectivamente.

Observe que operar em qualquer objeto de um esquema também requer o privilégio USAGE no banco de dados e esquema principais.

Para instruções sobre como criar uma função personalizada com um conjunto específico de privilégios, consulte Criação de funções personalizadas.

Para informações gerais sobre concessões de funções e privilégios para executar ações de SQL em objetos protegíveis, consulte Visão geral do controle de acesso.

Novo canal

Crie um novo canal usando CREATE PIPE:

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

Onde:

ERROR_INTEGRATION = <nome_integração>

Nome da integração de notificação que você criou em Etapa 4: Criar a integração de notificação (neste tópico).

Por exemplo:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

Canal existente

Modifique um canal existente usando ALTER PIPE.

Nota

Se uma integração de notificação foi especificada quando o canal foi criado, é necessário primeiramente remover o parâmetro ERROR_INTEGRATION (usando ALTER PIPE … UNSET ERROR_INTEGRATION) e depois definir o parâmetro.

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

Em que <integration_name> é o nome da integração de notificação que você criou em Etapa 4: Criar a integração de notificação (neste tópico).

Por exemplo:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

Carga útil de mensagem de notificação de erro

O corpo de mensagens de erro identifica o canal e os erros encontrados durante um carregamento.

O seguinte é uma mensagem de exemplo de carga útil descrevendo um erro do Snowpipe. A carga útil pode incluir uma ou mais mensagens de erro.

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

Observe que você deve analisar a cadeia de caracteres em um objeto JSON para processar valores na carga útil.