Habilitação de notificações de erro do Snowpipe para o Google Pub/Sub

Este tópico fornece instruções para enviar as notificações de erro do Snowpipe para o serviço Google Cloud Pub/Sub (Pub/Sub).

Este recurso pode enviar notificações de erro para os seguintes tipos de cargas:

  • Snowpipe com ingestão automática.

  • Chamadas para o ponto de extremidade insertFiles da API REST Snowpipe.

  • Carregamentos do Apache Kafka usando o conector Snowflake para Kafka somente com o método de ingestão Snowpipe.

Neste tópico:

Suporte para a plataforma de nuvem

Atualmente, esse recurso está limitado às contas Snowflake hospedadas no Google Cloud Platform (GCP). O Snowpipe pode carregar dados de arquivos em qualquer serviço de armazenamento em nuvem suportado; no entanto, notificações por push para o Pub/Sub só são suportadas em contas Snowflake hospedadas no GCP.

Notas

  • Snowflake garante a entrega de notificações de erro pelo menos uma vez (ou seja, são feitas várias tentativas de entrega de mensagens para garantir pelo menos uma tentativa bem sucedida, o que pode resultar em mensagens duplicadas).

  • Esse recurso é implementado utilizando o objeto de integração de notificação. Uma integração de notificação é um objeto Snowflake que fornece uma interface entre o Snowflake e serviços de enfileiramento de mensagens de terceiros. Uma única integração de notificação pode oferecer suporte a múltiplos canais.

Habilitação de notificações de erro

Criação da integração de notificação

Consulte Criação de uma integração de notificação para enviar notificações para um tópico do Google Cloud Pub/Sub.

Ativação das notificações de erro em canais

Uma única integração de notificação pode ser compartilhada por vários canais. O corpo das mensagens de erro identifica o canal, o caminho e o estágio externo, o arquivo onde o erro se originou, entre outros detalhes.

Para permitir notificações de erro de um canal, especifique um valor de parâmetro ERROR_INTEGRATION.

Nota

A criação ou modificação de um canal que faz referência a uma integração de notificação requer uma função que tenha o privilégio USAGE sobre a integração de notificação. Além disso, a função deve ter ou o privilégio CREATE PIPE sobre o esquema ou o privilégio OWNERSHIP sobre o canal, respectivamente.

Observe que operar em qualquer objeto de um esquema também requer o privilégio USAGE no banco de dados e esquema principais.

Para instruções sobre como criar uma função personalizada com um conjunto específico de privilégios, consulte Criação de funções personalizadas.

Para informações gerais sobre concessões de funções e privilégios para executar ações de SQL em objetos protegíveis, consulte Visão geral do controle de acesso.

Novo canal

Crie um novo canal usando CREATE PIPE:

CREATE PIPE <name>
  AUTO_INGEST = TRUE
  [ INTEGRATION = '<string>' ]
  ERROR_INTEGRATION = <integration_name>
  AS <copy_statement>
Copy

Onde:

ERROR_INTEGRATION = <nome_integração>

Nome da integração de notificação que você criou em Criar uma integração de notificação no Snowflake.

Por exemplo:

CREATE PIPE mypipe
  AUTO_INGEST = TRUE
  INTEGRATION = 'my_storage_int'
  ERROR_INTEGRATION = my_notification_int
  AS
  COPY INTO mydb.public.mytable
  FROM @mydb.public.mystage;
Copy

Canal existente

Modifique um canal existente usando ALTER PIPE.

Nota

Se uma integração de notificação foi especificada quando o canal foi criado, é necessário primeiramente remover o parâmetro ERROR_INTEGRATION (usando ALTER PIPE … UNSET ERROR_INTEGRATION) e depois definir o parâmetro.

ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Copy

Onde <nome_integração> é o nome da integração de notificação que você criou em Criar uma integração de notificação no Snowflake.

Por exemplo:

ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Copy

Carga útil de mensagem de notificação de erro

O corpo de mensagens de erro identifica o canal e os erros encontrados durante um carregamento.

O seguinte é uma mensagem de exemplo de carga útil descrevendo um erro do Snowpipe. A carga útil pode incluir uma ou mais mensagens de erro.

{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Copy

Observe que você deve analisar a cadeia de caracteres em um objeto JSON para processar valores na carga útil.