Habilitação de notificações de erro do Snowpipe para o Google Pub/Sub¶
Este tópico fornece instruções para enviar as notificações de erro do Snowpipe para o serviço Google Cloud Pub/Sub (Pub/Sub).
Este recurso pode enviar notificações de erro para os seguintes tipos de cargas:
Snowpipe com ingestão automática.
Chamadas para o ponto de extremidade
insertFiles
da API REST Snowpipe.Carregamentos do Apache Kafka usando o conector Snowflake para Kafka somente com o método de ingestão Snowpipe.
Neste tópico:
Suporte para a plataforma de nuvem¶
Atualmente, esse recurso está limitado às contas Snowflake hospedadas no Google Cloud Platform (GCP). O Snowpipe pode carregar dados de arquivos em qualquer serviço de armazenamento em nuvem suportado; no entanto, notificações por push para o Pub/Sub só são suportadas em contas Snowflake hospedadas no GCP.
Notas¶
Snowflake garante a entrega de notificações de erro pelo menos uma vez (ou seja, são feitas várias tentativas de entrega de mensagens para garantir pelo menos uma tentativa bem sucedida, o que pode resultar em mensagens duplicadas).
Esse recurso é implementado utilizando o objeto de integração de notificação. Uma integração de notificação é um objeto Snowflake que fornece uma interface entre o Snowflake e serviços de enfileiramento de mensagens de terceiros. Uma única integração de notificação pode oferecer suporte a múltiplos canais.
Habilitação de notificações de erro¶
Etapa 1: Criar o tópico Pub/Sub¶
Crie um tópico Pub/Sub que possa receber mensagens de notificação de erro do Snowflake ou reutilizar um tópico existente. Você pode criar o tópico usando SDK Cloud Shell ou Cloud. Para obter mais informações, consulte Criação e uso de tópicos na documentação do Pub/Sub.
Por exemplo, execute o seguinte comando para criar um tópico vazio:
$ gsutil notification create -t <topic>
Se o tópico já existe, o comando o utiliza; caso contrário, é criado um novo tópico.
Etapa 2: Criar a assinatura Pub/Sub¶
Opcionalmente, crie uma assinatura para o tópico Pub/Sub para recuperar notificações de erro. Você pode criar uma assinatura com entrega por Pull usando a ferramenta de linha de comando gcloud
ou a API Cloud Pub/Sub. Para obter instruções, consulte Gerenciamento de tópicos e assinaturas na documentação do Pub/Sub.
Etapa 3: Criar uma integração de notificação no Snowflake¶
Crie uma integração de notificação usando o comando CREATE NOTIFICATION INTEGRATION. A integração de notificação faz referência ao seu tópico Pub/Sub. O Snowflake associa a integração de notificação com uma conta de serviço Goodle Cloud Platform (GCP) criada para sua conta. O Snowflake cria uma única conta de serviço que é referenciada por todas as integrações de notificação GCP em sua conta Snowflake.
Nota
Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.
A conta de serviço GCP para integrações de notificação é diferente da conta de serviço criada para integrações de armazenamento.
CREATE NOTIFICATION INTEGRATION <integration_name>
ENABLED = TRUE
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
GCP_PUBSUB_TOPIC_NAME = '<topic_id>'
Onde:
integration_name
é o nome da nova integração.topic_id
é o tópico Pub/Sub para o qual o Snowflake envia notificações de erro. Para obter mais informações, consulte Etapa 1: Criar o tópico Pub/Sub (neste tópico).
Por exemplo:
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
DIRECTION = OUTBOUND
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_TOPIC_NAME = 'projects/sdm-prod/topics/mytopic';
Etapa 4: Conceder acesso do Snowflake à assinatura Pub/Sub¶
Execute o comando DESCRIBE INTEGRATION para recuperar a ID da conta de serviço Snowflake:
DESC NOTIFICATION INTEGRATION <integration_name>;
Onde:
integration_name
é o nome da integração que você criou em “Etapa 1: Criar uma integração de notificação no Snowflake”.
Por exemplo:
DESC NOTIFICATION INTEGRATION my_notification_int;
Registre o nome da conta de serviço na coluna GCP_PUBSUB_SERVICE_ACCOUNT, que tem o seguinte formato:
<service_account>@<project_id>.iam.gserviceaccount.com
Acesse o Console Google Cloud Platform como editor de projeto.
No painel inicial, escolha Big Data » Pub/Sub » Subscriptions.
Selecione a assinatura a ser configurada para acesso.
Clique em SHOW INFO PANEL no canto superior direito. O painel de informações da assinatura é exibido.
No campo Add members, procure pelo nome da conta de serviço que você registrou.
A partir do menu suspenso Select a role, selecione Pub/Sub Publisher.
Clique no botão Add. O nome da conta de serviço é adicionado ao menu suspenso da função Pub/Sub Publisher no painel de informações.
Etapa 5: habilitar notificações de erro em canais¶
Uma única integração de notificação pode ser compartilhada por vários canais. O corpo das mensagens de erro identifica o canal, o caminho e o estágio externo, o arquivo onde o erro se originou, entre outros detalhes.
Para permitir notificações de erro de um canal, especifique um valor de parâmetro ERROR_INTEGRATION.
Nota
A criação ou modificação de um canal que faz referência a uma integração de notificação requer uma função que tenha o privilégio USAGE sobre a integração de notificação. Além disso, a função deve ter ou o privilégio CREATE PIPE sobre o esquema ou o privilégio OWNERSHIP sobre o canal, respectivamente.
Observe que operar em qualquer objeto de um esquema também requer o privilégio USAGE no banco de dados e esquema principais.
Para instruções sobre como criar uma função personalizada com um conjunto específico de privilégios, consulte Criação de funções personalizadas.
Para informações gerais sobre concessões de funções e privilégios para executar ações de SQL em objetos protegíveis, consulte Visão geral do controle de acesso.
Novo canal¶
Crie um novo canal usando CREATE PIPE:
CREATE PIPE <name>
AUTO_INGEST = TRUE
[ INTEGRATION = '<string>' ]
ERROR_INTEGRATION = <integration_name>
AS <copy_statement>
Onde:
ERROR_INTEGRATION = <nome_integração>
Nome da integração de notificação que você criou em Etapa 4: Criar a integração de notificação (neste tópico).
Por exemplo:
CREATE PIPE mypipe
AUTO_INGEST = TRUE
INTEGRATION = 'my_storage_int'
ERROR_INTEGRATION = my_notification_int
AS
COPY INTO mydb.public.mytable
FROM @mydb.public.mystage;
Canal existente¶
Modifique um canal existente usando ALTER PIPE.
Nota
Se uma integração de notificação foi especificada quando o canal foi criado, é necessário primeiramente remover o parâmetro ERROR_INTEGRATION (usando ALTER PIPE … UNSET ERROR_INTEGRATION) e depois definir o parâmetro.
ALTER PIPE <name> SET ERROR_INTEGRATION = <integration_name>;
Em que <integration_name>
é o nome da integração de notificação que você criou em Etapa 4: Criar a integração de notificação (neste tópico).
Por exemplo:
ALTER PIPE mypipe SET ERROR_INTEGRATION = my_notification_int;
Carga útil de mensagem de notificação de erro¶
O corpo de mensagens de erro identifica o canal e os erros encontrados durante um carregamento.
O seguinte é uma mensagem de exemplo de carga útil descrevendo um erro do Snowpipe. A carga útil pode incluir uma ou mais mensagens de erro.
{\"version\":\"1.0\",\"messageId\":\"a62e34bc-6141-4e95-92d8-f04fe43b43f5\",\"messageType\":\"INGEST_FAILED_FILE\",\"timestamp\":\"2021-10-22T19:15:29.471Z\",\"accountName\":\"MYACCOUNT\",\"pipeName\":\"MYDB.MYSCHEMA.MYPIPE\",\"tableName\":\"MYDB.MYSCHEMA.MYTABLE\",\"stageLocation\":\"gcs://mybucket/mypath\",\"messages\":[{\"fileName\":\"/file1.csv_0_0_0.csv.gz\",\"firstError\":\"Numeric value 'abc' is not recognized\"}]}
Observe que você deve analisar a cadeia de caracteres em um objeto JSON para processar valores na carga útil.