Automação do Snowpipe para Amazon S3

Este tópico fornece instruções para acionar o carregamento automático de dados do Snowpipe usando notificações do Amazon SQS (Simple Queue Service) para um bucket S3.

A Snowflake recomenda que você envie apenas eventos suportados para o Snowpipe para reduzir custos, ruído de eventos e latência.

Neste tópico:

Suporte para a plataforma de nuvem

O acionamento de cargas de dados automatizadas do Snowpipe usando mensagens de evento S3 conta com o suporte por contas Snowflake hospedadas em todas as plataformas de nuvem compatíveis.

Tráfego de rede

Nota para clientes do Virtual Private Snowflake (VPS) e AWS PrivateLink:

Automatizar o Snowpipe usando notificações do Amazon SQS funciona bem. Entretanto, embora o armazenamento em nuvem AWS dentro de um VPC (incluindo VPS) possa se comunicar com seus próprios serviços de mensagens (Amazon SQS, Amazon Simple Notification Service), este tráfego flui entre servidores na rede segura da Amazon fora do VPC; portanto, este tráfego não é protegido pelo VPC.

Configuração de acesso seguro ao armazenamento em nuvem

Nota

Se você já tiver configurado o acesso seguro ao bucket S3 que armazena seus arquivos de dados, pode pular esta seção.

Esta seção descreve como usar as integrações de armazenamento para permitir que o Snowflake leia e grave dados em um bucket Amazon S3 referenciado em um estágio externo (ou seja, S3). As integrações são objetos Snowflake nomeados e de primeira classe que evitam a necessidade de passar credenciais explícitas de provedores de nuvens, tais como chaves secretas ou tokens de acesso. Os objetos de integração armazenam uma ID de usuário de gerenciamento de identidade e acesso (IAM) da AWS. Um administrador em sua organização concede ao usuário IAM da integração permissões na conta AWS.

Uma integração também pode listar buckets (e caminhos opcionais) que limitam os locais que os usuários podem especificar ao criar estágios externos que utilizam a integração.

Nota

  • Completar as instruções nesta seção requer permissões na AWS para criar e gerenciar políticas e funções de IAM. Se você não for um administrador AWS, peça a seu administrador AWS que complete estas tarefas.

  • Observe que, atualmente, o acesso ao armazenamento S3 em regiões governamentais usando uma integração de armazenamento está limitado a contas Snowflake hospedadas na AWS na mesma região governamental. Há suporte para o acesso ao seu armazenamento S3 a partir de uma conta hospedada fora da região governamental usando credenciais diretas.

O diagrama a seguir mostra o fluxo de integração para um estágio S3:

Amazon S3 Stage Integration Flow
  1. Um estágio externo (ou seja, S3) faz referência a um objeto de integração de armazenamento em sua definição.

  2. O Snowflake associa automaticamente a integração do armazenamento com um usuário IAM do S3 criado para sua conta. O Snowflake cria um único usuário IAM que é referenciado por todas as integrações de armazenamento S3 em sua conta Snowflake.

  3. Um administrador AWS em sua organização concede permissões ao usuário IAM para acessar o bucket referenciado na definição do estágio. Observe que muitos objetos de preparação externo podem fazer referência a diferentes buckets e caminhos e usar a mesma integração de armazenamento para autenticação.

Quando um usuário carrega ou descarrega dados de ou para um estágio, o Snowflake verifica as permissões concedidas ao usuário IAM no bucket antes de permitir ou negar o acesso.

Nota

Recomendamos esta opção, o que evita a necessidade de fornecer credenciais IAM ao acessar o armazenamento em nuvem. Consulte Configuração de acesso seguro ao Amazon S3 para opções adicionais de acesso ao armazenamento.

Nesta seção:

Etapa 1: Configurar permissões de acesso para o bucket S3

Requisitos de controle de acesso da AWS

O Snowflake requer as seguintes permissões em um bucket S3 e pasta para poder acessar os arquivos na pasta (e subpastas):

  • s3:GetBucketLocation

  • s3:GetObject

  • s3:GetObjectVersion

  • s3:ListBucket

Como prática recomendada, a Snowflake recomenda a criação de uma política IAM para o acesso do Snowflake ao bucket S3. Você pode então anexar a política à função e usar as credenciais de segurança geradas pela AWS para a função para acessar os arquivos no bucket.

Criação de uma política IAM

As seguintes instruções passo a passo descrevem como configurar as permissões de acesso para o Snowflake em seu Console de gerenciamento AWS para que você possa acessar seu bucket S3.

  1. Faça login no Console de gerenciamento AWS.

  2. No painel inicial, escolha Identity & Access Management (IAM):

    Identity & Access Management in AWS Management Console
  3. Escolha Account settings no painel de navegação à esquerda.

  4. Expanda a lista Security Token Service Regions, encontre a região AWS correspondente à região onde sua conta está localizada e escolha Activate se o status for Inactive.

  5. Escolha Policies no painel de navegação à esquerda.

  6. Clique em Create Policy:

    Create Policy button on Policies page
  7. Clique na guia JSON.

  8. Adicione um documento de política que permitirá ao Snowflake acessar o bucket S3 e a pasta.

    A seguinte política (em formato JSON) fornece ao Snowflake as permissões necessárias para carregar ou descarregar dados usando um único bucket e caminho de pasta.

    Copie e cole o texto no editor de políticas:

    Nota

    • Certifique-se de substituir bucket e prefix pelo nome real do bucket e prefixo do caminho da pasta.

    • Os Nomes de recurso da Amazon (ARN) para buckets em regiões governamentais têm um prefixo arn:aws-us-gov:s3:::.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                  "s3:GetObject",
                  "s3:GetObjectVersion"
                ],
                "Resource": "arn:aws:s3:::<bucket>/<prefix>/*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:GetBucketLocation"
                ],
                "Resource": "arn:aws:s3:::<bucket>",
                "Condition": {
                    "StringLike": {
                        "s3:prefix": [
                            "<prefix>/*"
                        ]
                    }
                }
            }
        ]
    }
    
    Copy

    Nota

    Definir a condição "s3:prefix": como ["*"] ou ["<caminho>/*"] concede acesso a todos os prefixos no bucket ou caminho no bucket especificado, respectivamente.

    Observe que as políticas AWS oferecem suporte a uma variedade de diferentes casos de uso de segurança.

  9. Clique em Review policy.

  10. Digite o nome da política (por exemplo, snowflake_access) e uma descrição opcional. Clique em Create policy.

    Create Policy button in Review Policy page

Etapa 2: Crie a função IAM na AWS

No Console de gerenciamento AWS, crie uma função AWS IAM para conceder privilégios para o bucket S3 que contém seus arquivos de dados.

  1. Faça login no Console de gerenciamento AWS.

  2. No painel inicial, escolha Identity & Access Management (IAM):

    Identity & Access Management in AWS Management Console
  3. Escolha Roles no painel de navegação à esquerda.

  4. Clique no botão Create role.

    Select Trusted Entity Page in AWS Management Console
  5. Selecione Another AWS account como o tipo de entidade confiável.

  6. No campo Account ID, digite temporariamente sua própria ID de conta AWS. Mais tarde, você modificará a relação de confiança e concederá acesso ao Snowflake.

  7. Selecione a opção Require external ID. Insira uma ID fictícia, como 0000. Mais tarde, você modificará a relação de confiança e especificará a ID externa para seu estágio do Snowflake. É necessário uma ID externa para conceder acesso a seus recursos AWS (ou seja, S3) a um terceiro (ou seja, Snowflake).

  8. Clique no botão Next.

  9. Localize a política que você criou na etapa 1: Configurar permissões de acesso para o bucket S3 (neste tópico) e selecione esta política.

  10. Clique no botão Next.

    Review Page in AWS Management Console
  11. Digite um nome e descrição para a função e clique no botão Create role.

    Agora você criou uma política IAM para um bucket, criou uma função IAM e anexou a política à função.

  12. Registre o valor Role ARN localizado na página de resumo de funções. Na próxima etapa, você criará uma integração do Snowflake que faz referência a essa função.

    IAM Role

Nota

O Snowflake armazena em cache as credenciais temporárias por um período que não pode exceder o tempo de expiração de 60 minutos. Se você revogar o acesso do Snowflake, os usuários podem conseguir listar arquivos e acessar dados a partir do local de armazenamento em nuvem até que o cache expire.

Etapa 3: Criar uma integração de armazenamento em nuvem no Snowflake

Crie uma integração de armazenamento usando o comando CREATE STORAGE INTEGRATION. Uma integração de armazenamento é um objeto Snowflake que armazena um usuário gerado de gerenciamento de identidade e acesso (IAM) para seu armazenamento em nuvem S3, junto com um conjunto opcional de locais de armazenamento permitidos ou bloqueados (ou seja, buckets). Os administradores do provedor de nuvem em sua organização concedem permissões para os locais de armazenamento ao usuário gerado. Esta opção permite que os usuários evitem fornecer credenciais ao criar estágios ou carregar dados.

Uma única integração de armazenamento pode oferecer suporte a múltiplos estágios externos (ou seja, S3). A URL na definição do estágio deve estar alinhada com os buckets S3 (e caminhos opcionais) especificados para o parâmetro STORAGE_ALLOWED_LOCATIONS.

Nota

Somente administradores de conta (usuários com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION podem executar este comando SQL.

CREATE STORAGE INTEGRATION <integration_name>
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<iam_role>'
  STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
  [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
Copy

Onde:

  • integration_name é o nome da nova integração.

  • iam_role é o nome do recurso Amazon (ARN) da função que você criou em Etapa 2: Criar a função IAM na AWS (neste tópico).

  • bucket é o nome de um bucket S3 que armazena seus arquivos de dados (por exemplo, mybucket). O parâmetro STORAGE_ALLOWED_LOCATIONS obrigatório e o parâmetro STORAGE_BLOCKED_LOCATIONS opcional restringem ou bloqueiam o acesso a estes buckets, respectivamente, quando os estágios que fazem referência a esta integração são criados ou modificados.

  • path é um caminho opcional que pode ser usado para proporcionar um controle granular sobre objetos no bucket.

O exemplo a seguir cria uma integração que limita explicitamente os estágios externos que utilizam a integração para referenciar um de dois buckets ou caminhos. Em uma etapa posterior, criaremos um estágio externo que referencia um desses buckets e caminhos.

Os estágios externos adicionais que também utilizam esta integração podem fazer referência aos buckets e caminhos permitidos:

CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::001234567890:role/myrole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/')
  STORAGE_BLOCKED_LOCATIONS = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');
Copy

Etapa 4: Recuperar o usuário AWS IAM para sua conta Snowflake

  1. Execute o comando DESCRIBE INTEGRATION para recuperar o ARN para o usuário AWS IAM que foi criado automaticamente para sua conta Snowflake:

    DESC INTEGRATION <integration_name>;
    
    Copy

    Onde:

    Por exemplo:

    DESC INTEGRATION s3_int;
    
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    | property                  | property_type | property_value                                                                 | property_default |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------|
    | ENABLED                   | Boolean       | true                                                                           | false            |
    | STORAGE_ALLOWED_LOCATIONS | List          | s3://mybucket1/mypath1/,s3://mybucket2/mypath2/                                | []               |
    | STORAGE_BLOCKED_LOCATIONS | List          | s3://mybucket1/mypath1/sensitivedata/,s3://mybucket2/mypath2/sensitivedata/    | []               |
    | STORAGE_AWS_IAM_USER_ARN  | String        | arn:aws:iam::123456789001:user/abc1-b-self1234                                 |                  |
    | STORAGE_AWS_ROLE_ARN      | String        | arn:aws:iam::001234567890:role/myrole                                          |                  |
    | STORAGE_AWS_EXTERNAL_ID   | String        | MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=                               |                  |
    +---------------------------+---------------+--------------------------------------------------------------------------------+------------------+
    
    Copy
  2. Registre os seguintes valores:

    Valor

    Descrição

    STORAGE_AWS_IAM_USER_ARN

    O usuário AWS IAM criado para sua conta Snowflake, arn:aws:iam::123456789001:user/abc1-b-self1234 neste exemplo. Nós fornecemos um único usuário IAM para toda a sua conta Snowflake. Todas as integrações de armazenamento S3 utilizam esse usuário IAM.

    STORAGE_AWS_EXTERNAL_ID

    A ID externa que é necessária para estabelecer uma relação de confiança.

    Você fornecerá estes valores na próxima seção.

Etapa 5: Conceder ao usuário IAM permissões do usuário para acessar objetos de bucket

As seguintes instruções passo a passo descrevem como configurar as permissões de acesso IAM ao Snowflake em seu Console de gerenciamento AWS para que você possa usar um bucket S3 para carregar e descarregar dados:

  1. Faça login no Console de gerenciamento AWS.

  2. No painel inicial, escolha Identity & Access Management (IAM):

    Identity & Access Management in AWS Management Console
  3. Escolha Roles no painel de navegação à esquerda.

  4. Clique na função que criou na Etapa 2: Criar a função IAM na AWS (neste tópico).

  5. Clique na guia Trust relationships.

  6. Clique no botão Edit trust relationship.

  7. Modifique o documento da política com os valores de saída DESC STORAGE INTEGRATION registrados na Etapa 4: Recuperar o usuário AWS IAM para sua conta Snowflake (neste tópico):

    Documento de política da função IAM

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "AWS": "<snowflake_user_arn>"
          },
          "Action": "sts:AssumeRole",
          "Condition": {
            "StringEquals": {
              "sts:ExternalId": "<snowflake_external_id>"
            }
          }
        }
      ]
    }
    
    Copy

    Onde:

    • snowflake_user_arn é o valor STORAGE_AWS_IAM_USER_ARN que você registrou.

    • snowflake_external_id é o valor STORAGE_AWS_EXTERNAL_ID que você registrou.

      Neste exemplo, o valor snowflake_external_id é MYACCOUNT_SFCRole=2_a123456/s0aBCDEfGHIJklmNoPq=.

      Nota

      Por razões de segurança, se você criar uma nova integração de armazenamento (ou recriar uma integração de armazenamento existente usando a sintaxe CREATE OR REPLACE STORAGE INTEGRATION), a integração resultante terá uma ID externa diferente e, portanto, não poderá resolver a relação de confiança a menos que a política de confiança seja modificada.

  8. Clique no botão Update Trust Policy. As alterações são salvas.

Nota

O Snowflake armazena em cache as credenciais temporárias por um período que não pode exceder o tempo de expiração de 60 minutos. Se você revogar o acesso do Snowflake, os usuários podem ser capazes de listar arquivos e carregar dados a partir do local de armazenamento em nuvem até que o cache expire.

Determinação da opção correta

Antes de prosseguir, determine se existe uma notificação de evento S3 para o caminho de destino (ou “prefixo”, na terminologia AWS) em seu bucket S3 onde seus arquivos de dados estão localizados. As regras da AWS proíbem a criação de notificações conflitantes para o mesmo caminho.

As seguintes opções para automatizar o Snowpipe usando o Amazon SQS têm suporte:

  • Opção 1. Notificação de novo evento S3: Crie uma notificação de evento para o caminho de destino em seu bucket S3. A notificação de evento informa o Snowpipe através de uma fila SQS quando os arquivos estão prontos para serem carregados.

    Esta é a opção mais comum.

    Importante

    Se existir uma notificação de evento conflitante para seu bucket S3, use a opção 2 em seu lugar.

  • Opção 2. Notificação de evento existente: Configure o Amazon Simple Notification Service (SNS) como um transmissor para compartilhar notificações para um determinado caminho com vários pontos de extremidade (ou “assinantes”, por exemplo, filas SQS filas ou cargas de trabalho Lambda AWS), incluindo a fila SQS do Snowflake para automação do Snowpipe. Uma notificação de evento S3 publicada por SNS informa o Snowpipe através de uma fila SQS quando os arquivos estão prontos para serem carregados.

  • Opção 3. Configuração do Amazon EventBridge para automatizar o Snowpipe: Semelhante à opção 2, você também pode ativar o Amazon EventBridge para os buckets S3 e criar regras para enviar notificações para os tópicos SNS.

Opção 1: criação de uma notificação de novo evento S3 para automatizar o Snowpipe

Este tópico descreve a opção mais comum para acionar o carregamento automático de dados do Snowpipe usando notificações do Amazon SQS (Simple Queue Service) para um bucket S3. As etapas explicam como criar uma notificação de evento para o caminho de destino (ou “prefixo”, em terminologia AWS) em seu bucket S3 onde seus arquivos de dados são armazenados.

Importante

Se existir uma notificação de evento conflitante para seu bucket S3, use Opção 2: Configuração do Amazon SNS para automatizar o Snowpipe usando notificações SQS (neste tópico). As regras da AWS proíbem a criação de notificações conflitantes para o mesmo caminho de destino.

O diagrama a seguir mostra o fluxo do processo de ingestão automática do Snowpipe:

Snowpipe Auto-ingest Process Flow
  1. Os arquivos de dados são carregados em um estágio.

  2. Uma notificação de evento S3 informa o Snowpipe através de uma fila SQS que os arquivos estão prontos para serem carregados. O Snowpipe copia os arquivos em uma fila.

  3. Um warehouse virtual fornecido pelo Snowflake carrega os dados dos arquivos enfileirados na tabela de destino com base nos parâmetros definidos no canal especificado.

Nota

As instruções neste tópico presumem que uma tabela de destino já existe no banco de dados Snowflake onde seus dados serão carregados.

Etapa 1: criar um estágio (se necessário)

Crie um estágio externo que faça referência ao seu bucket S3 usando o comando CREATE STAGE. O Snowpipe busca seus arquivos de dados do estágio e os enfileira temporariamente antes de carregá-los na tabela de destino. Você também pode usar um estágio externo já existente.

Nota

  • Para configurar o acesso seguro ao local de armazenamento na nuvem, consulte Configuração de acesso seguro ao armazenamento em nuvem (neste tópico).

  • Para fazer referência a uma integração de armazenamento na instrução CREATE STAGE, a função deve ter o privilégio USAGE para o objeto de integração de armazenamento.

O exemplo a seguir cria um estágio chamado mystage no esquema ativo para a sessão do usuário. O URL de armazenamento em nuvem inclui o caminho files. O estágio faz referência a uma integração de armazenamento chamada my_storage_int:

USE SCHEMA snowpipe_db.public;

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;
Copy

Etapa 2: criar um canal com ingestão automática habilitada

Crie um canal usando o comando CREATE PIPE. O canal define a instrução COPY INTO <tabela> usada pelo Snowpipe para carregar os dados da fila de ingestão na tabela de destino.

O exemplo a seguir cria um canal chamado mypipe no esquema ativo para a sessão do usuário. O canal carrega os dados dos arquivos preparados no estágio mystage na tabela mytable:

create pipe snowpipe_db.public.mypipe auto_ingest=true as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

O parâmetro AUTO_INGEST=true especifica a leitura de notificações de eventos enviadas de um bucket S3 para uma fila SQS quando novos dados estiverem prontos para serem carregados.

Importante

Compare a referência do estágio na definição do canal com os canais existentes. Verifique se os caminhos de diretório para o mesmo bucket S3 não se sobrepõem; caso contrário, vários canais poderiam carregar o mesmo conjunto de arquivos de dados várias vezes, em uma ou mais tabelas de destino. Isto pode acontecer, por exemplo, quando vários estágios fazem referência ao mesmo bucket S3 com diferentes níveis de granularidade, como s3://mybucket/path1 e s3://mybucket/path1/path2. Neste caso de uso, se os arquivos forem preparados em s3://mybucket/path1/path2, os canais para ambos os estágios carregariam uma cópia dos arquivos.

Isto é diferente da configuração manual do Snowpipe (com ingestão automática desabilitada), que requer que os usuários enviem um conjunto de arquivos nomeado a uma API REST para enfileirar os arquivos para carregamento. Com a ingestão automática habilitada, cada canal recebe uma lista de arquivos gerada a partir das notificações de eventos S3. É necessário um cuidado adicional para evitar a duplicação de dados.

Etapa 3: configurar a segurança

Para cada usuário que irá executar cargas de dados contínuas usando o Snowpipe, conceder privilégios suficientes de controle de acesso aos objetos para a carga de dados (ou seja, o banco de dados, esquema e tabela de destino; o objeto de preparação e o canal).

Nota

Para seguir o princípio geral do “menor privilégio”, recomendamos a criação de um usuário e função separados a serem usados para a ingestão de arquivos usando um canal. O usuário deve ser criado com esta função padrão.

A utilização do Snowpipe requer uma função com os seguintes privilégios:

Objeto

Privilégio

Notas

Canal nomeado

OWNERSHIP

Estágio nomeado

USAGE , READ

Formato de arquivo nomeado

USAGE

Opcional; necessário apenas se o estágio que você criou na Etapa 1: criar um estágio (se necessário) fizer referência a um formato de arquivo nomeado.

Banco de dados de destino

USAGE

Esquema de destino

USAGE

Tabela de destino

INSERT , SELECT

Use o comando GRANT <privilégios> para conceder privilégios à função.

Nota

Somente administradores de segurança (ou seja, usuários com a função SECURITYADMIN) ou superior, ou outra função com o privilégio CREATE ROLE para a conta e o privilégio global MANAGE GRANTS, podem criar funções e conceder privilégios.

Por exemplo, crie uma função chamada snowpipe_role que possa acessar um conjunto de objetos de banco de dados snowpipe_db.public, bem como um canal chamado mypipe; então, conceda a função a um usuário:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe_role;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe_role;

grant usage on schema snowpipe_db.public to role snowpipe_role;

grant insert, select on snowpipe_db.public.mytable to role snowpipe_role;

grant usage on stage snowpipe_db.public.mystage to role snowpipe_role;

-- Pause the pipe for OWNERSHIP transfer
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role;

-- Grant the role to a user
grant role snowpipe_role to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe_role;

-- Resume the pipe
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
Copy

Etapa 4: configurar notificações de eventos

Configure notificações de eventos para seu bucket S3 para notificar o Snowpipe quando novos dados estiverem disponíveis para carregar. O recurso de ingestão automática depende de filas SQS para entregar notificações de eventos do S3 para o Snowpipe.

Para facilitar o uso, as filas SQS do Snowpipe são criadas e gerenciadas pelo Snowflake. A saída do comando SHOW PIPES exibe o Nome de recurso da Amazon (ARN) de sua fila SQS.

  1. Execute o comando SHOW PIPES:

    SHOW PIPES;
    
    Copy

    Observe o ARN da fila SQS para o estágio na coluna notification_channel. Copie o ARN para um local conveniente.

    Nota

    Seguindo as diretrizes AWS, o Snowflake designa não mais que uma fila SQS por bucket S3. Esta fila SQS pode ser compartilhada entre vários buckets na mesma conta AWS. A fila SQS coordena as notificações para todos os canais que conectam os estágios externos do bucket S3 às tabelas de destino. Quando um arquivo de dados é carregado no bucket, todos os canais que correspondem ao caminho do diretório de estágios realizam uma carga única do arquivo em suas tabelas de destino correspondentes.

  2. Entre no console Amazon S3.

  3. Configure uma notificação de evento para seu bucket S3 usando as instruções fornecidas na documentação Amazon S3. Complete os campos da seguinte forma:

    • Name: Nome da notificação de evento (por exemplo, Auto-ingest Snowflake).

    • Events: Selecione a opção ObjectCreate (All).

    • Send to: Selecione SQS Queue a partir da lista suspensa.

    • SQS: Selecione Add SQS queue ARN a partir da lista suspensa.

    • SQS queue ARN: Cole o nome da fila SQS da saída SHOW PIPES.

Nota

Estas instruções criam uma única notificação de evento que monitora a atividade para todo o bucket S3. Esta é a abordagem mais simples. Esta notificação trata de todos os canais configurados em um nível mais granular no diretório do bucket S3. O Snowpipe carrega somente arquivos de dados como especificado nas definições de canal. Observe, entretanto, que um alto volume de notificações para atividades fora de uma definição de canal pode ter um impacto negativo na velocidade com a qual o Snowpipe filtra as notificações e toma medidas.

Alternativamente, nas etapas acima, configure um ou mais caminhos e/ou extensões de arquivo (ou prefixos e sufixos, em terminologia AWS) para filtrar a atividade de eventos. Para instruções, consulte as informações sobre filtragem de nomes de chave de objeto no tópico da documentação AWS relevante. Repita essas etapas para cada caminho ou extensão de arquivo adicional que deseja que a notificação monitore.

Observe que a AWS limita o número destas configurações de fila de notificação a um máximo de 100 por bucket S3.

Observe também que a AWS não permite configurações de fila sobrepostas (em várias notificações de eventos) para o mesmo bucket S3. Por exemplo, se uma notificação existente for configurada para s3://mybucket/load/path1, então não será possível criar outra notificação em um nível superior, tal como s3://mybucket/load, ou vice-versa.

O Snowpipe com ingestão automática está configurado!

Quando novos arquivos de dados são adicionados ao bucket S3, a notificação de evento informa o Snowpipe para carregá-los na tabela de destino definida no canal.

Etapa 5: carregar arquivos históricos

Para carregar qualquer lista de pendências de arquivos de dados que existiam no estágio externo antes que as notificações SQS fossem configuradas, consulte Carregamento de dados históricos.

Etapa 6: excluir arquivos preparados

Exclua os arquivos preparados depois de carregar os dados com sucesso e não precisar mais dos arquivos. Para obter instruções, consulte Eliminação de arquivos preparados depois que o Snowpipe carrega os dados.

Opção 2: Configuração do Amazon SNS para automatizar o Snowpipe usando notificações SQS

Esta seção descreve como acionar o carregamento automático de dados do Snowpipe usando notificações do Amazon SQS (Simple Queue Service) para um bucket S3. Os passos explicam como configurar o Amazon Simple Notification Service (SNS) como um transmissor para publicar notificações de eventos de seu bucket S3 para múltiplos assinantes (por exemplo, filas SQS ou cargas de trabalho Lambda AWS), incluindo a fila SQS do Snowflake para automação do Snowpipe.

Nota

Estas instruções presumem que existe uma notificação de evento para o caminho de destino em seu bucket S3 onde seus arquivos de dados estão localizados. Se não houver nenhuma notificação de evento:

O diagrama a seguir mostra o fluxo do processo para ingestão automática do Snowpipe com Amazon SNS:

Snowpipe Auto-ingest Process Flow with Amazon SNS
  1. Os arquivos de dados são carregados em um estágio.

  2. Uma notificação de evento S3 publicada por SNS informa o Snowpipe através de uma fila SQS que os arquivos estão prontos para serem carregados. O Snowpipe copia os arquivos em uma fila.

  3. Um warehouse virtual fornecido pelo Snowflake carrega os dados dos arquivos enfileirados na tabela de destino com base nos parâmetros definidos no canal especificado.

Nota

As instruções presumem que uma tabela de destino já existe no banco de dados Snowflake onde seus dados serão carregados.

A ingestão automática do Snowpipe é compatível com tópicos SNS AWS criptografado por KMS. Para obter mais informações, consulte Criptografia em rest.

Pré-requisito: Criar um tópico e assinatura do Amazon SNS

  1. Crie um tópico do SNS em sua conta AWS para tratar todas as mensagens para o local do estágio do Snowflake em seu bucket S3.

  2. Inscreva seus destinos-alvo para as notificações de eventos S3 (por exemplo, outras filas SQS ou cargas de trabalho Lambda AWS) para este tópico. O SNS publica notificações de eventos para seu bucket para todos os assinantes do tópico.

Para instruções, consulte a documentação do SNS.

Etapa 1: Inscreva a fila SQS do Snowflake no tópico do SNS

  1. Faça login no Console de gerenciamento AWS.

  2. No painel inicial, escolha Simple Notification Service (SNS).

  3. Escolha Topics no painel de navegação à esquerda.

  4. Localize o tópico para seu bucket S3. Observe o ARN do tópico.

  5. Usando um cliente Snowflake, consulte a função SYSTEM$GET_AWS_SNS_IAM_POLICY do sistema com seu ARN do tópico SNS:

    select system$get_aws_sns_iam_policy('<sns_topic_arn>');
    
    Copy

    A função retorna uma política IAM que concede a uma fila SQS do Snowflake permissão para se inscrever no tópico SNS.

    Por exemplo:

    select system$get_aws_sns_iam_policy('arn:aws:sns:us-west-2:001234567890:s3_mybucket');
    
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | SYSTEM$GET_AWS_SNS_IAM_POLICY('ARN:AWS:SNS:US-WEST-2:001234567890:S3_MYBUCKET')                                                                                                                                                                   |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | {"Version":"2012-10-17","Statement":[{"Sid":"1","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"},"Action":["sns:Subscribe"],"Resource":["arn:aws:sns:us-west-2:001234567890:s3_mybucket"]}]}                 |
    +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    
    Copy
  6. Volte para o Console de gerenciamento AWS. Escolha Topics no painel de navegação à esquerda.

  7. Selecione o tópico para seu bucket S3 e clique no botão Edit. A página Edit é aberta.

  8. Clique em Access policy - Optional para expandir esta área da página.

  9. Mesclar a adição da política IAM da função SYSTEM$GET_AWS_SNS_IAM_POLICY resulta no documento JSON.

    Por exemplo:

    Política IAM original (abreviada):

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         }
       ]
     }
    
    Copy

    Política IAM mesclada:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         }
       ]
     }
    
    Copy
  10. Adicione mais uma concessão de política para permitir que o S3 publique notificações de eventos para o bucket no tópico SNS.

    Por exemplo (usando o ARN do tópico SNS e o bucket S3 usado ao longo destas instruções):

    {
        "Sid":"s3-event-notifier",
        "Effect":"Allow",
        "Principal":{
           "Service":"s3.amazonaws.com"
        },
        "Action":"SNS:Publish",
        "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
        "Condition":{
           "ArnLike":{
              "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
           }
        }
     }
    
    Copy

    Política IAM mesclada:

    {
      "Version":"2008-10-17",
      "Id":"__default_policy_ID",
      "Statement":[
         {
            "Sid":"__default_statement_ID",
            "Effect":"Allow",
            "Principal":{
               "AWS":"*"
            }
            ..
         },
         {
            "Sid":"1",
            "Effect":"Allow",
            "Principal":{
              "AWS":"arn:aws:iam::123456789001:user/vj4g-a-abcd1234"
             },
             "Action":[
               "sns:Subscribe"
             ],
             "Resource":[
               "arn:aws:sns:us-west-2:001234567890:s3_mybucket"
             ]
         },
         {
            "Sid":"s3-event-notifier",
            "Effect":"Allow",
            "Principal":{
               "Service":"s3.amazonaws.com"
            },
            "Action":"SNS:Publish",
            "Resource":"arn:aws:sns:us-west-2:001234567890:s3_mybucket",
            "Condition":{
               "ArnLike":{
                  "aws:SourceArn":"arn:aws:s3:*:*:s3_mybucket"
               }
            }
          }
       ]
     }
    
    Copy
  11. Clique no botão Save changes.

Etapa 2: criar um estágio (se necessário)

Crie um estágio externo que faça referência ao seu bucket S3 usando o comando CREATE STAGE. O Snowpipe busca seus arquivos de dados do estágio e os enfileira temporariamente antes de carregá-los na tabela de destino.

Você também pode usar um estágio externo já existente.

Nota

Para configurar o acesso seguro ao local de armazenamento na nuvem, consulte Configuração de acesso seguro ao armazenamento em nuvem (neste tópico).

O exemplo a seguir cria um estágio chamado mystage no esquema ativo para a sessão do usuário. O URL de armazenamento em nuvem inclui o caminho files. O estágio faz referência a uma integração de armazenamento chamada my_storage_int:

CREATE STAGE mystage
  URL = 's3://mybucket/load/files'
  STORAGE_INTEGRATION = my_storage_int;
Copy

Etapa 3: criar um canal com ingestão automática habilitada

Crie um canal usando o comando CREATE PIPE. O canal define a instrução COPY INTO <tabela> usada pelo Snowpipe para carregar os dados da fila de ingestão na tabela de destino. Na instrução COPY, identifique o ARN do tópico SNS de Pré-requisito: criar um tópico e assinatura do Amazon SNS.

O exemplo a seguir cria um canal chamado mypipe no esquema ativo para a sessão do usuário. O canal carrega os dados dos arquivos preparados no estágio mystage na tabela mytable:

create pipe snowpipe_db.public.mypipe
  auto_ingest=true
  aws_sns_topic='<sns_topic_arn>'
  as
    copy into snowpipe_db.public.mytable
    from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

Onde:

AUTO_INGEST = true

Especifica a leitura de notificações de eventos enviadas de um bucket S3 para uma fila SQS quando novos dados estiverem prontos para serem carregados.

AWS_SNS_TOPIC = '<arn_tópico_sns>'

Especifica o ARN para o tópico SNS de seu bucket S3; por exemplo, arn:aws:sns:us-west-2:001234567890:s3_mybucket no exemplo atual. A instrução CREATE PIPE inscreve a fila SQS do Snowflake no tópico SNS especificado. Observe que o canal só copia arquivos na fila de ingestão acionada por notificações de eventos através do tópico SNS.

Para remover qualquer parâmetro de um canal, é necessário recriar o canal usando a sintaxe CREATE OR REPLACE PIPE.

Importante

Verifique se a referência do local de armazenamento na instrução COPY INTO <tabela> instrução não se sobrepõe à referência em canais existentes na conta. Caso contrário, vários canais poderiam carregar o mesmo conjunto de arquivos de dados nas tabelas de destino. Por exemplo, esta situação pode ocorrer quando múltiplas definições de canais fazem referência ao mesmo local de armazenamento com diferentes níveis de granularidade, como <local_de_armazenamento>/path1/ e <local_de_armazenamento>/path1/path2/. Neste exemplo, se os arquivos fossem preparados em <local_armazenamento>/path1/path2/, ambos os canais carregariam uma cópia dos arquivos.

Veja as instruções COPY INTO <tabela> nas definições de todos os canais da conta executando SHOW PIPES ou consultando a exibição PIPES em Account Usage ou a exibição PIPES no Information Schema.

Etapa 4: configurar a segurança

Para cada usuário que irá executar cargas de dados contínuas usando o Snowpipe, conceder privilégios suficientes de controle de acesso aos objetos para a carga de dados (ou seja, o banco de dados, esquema e tabela de destino; o objeto de preparação e o canal).

Nota

Para seguir o princípio geral do “menor privilégio”, recomendamos a criação de um usuário e função separados a serem usados para a ingestão de arquivos usando um canal. O usuário deve ser criado com esta função padrão.

A utilização do Snowpipe requer uma função com os seguintes privilégios:

Objeto

Privilégio

Notas

Canal nomeado

OWNERSHIP

Integração de armazenamento nomeada

USAGE

Necessário se o estágio que você criou na Etapa 2: criar um estágio (se necessário) fizer referência a uma integração de armazenamento.

Estágio nomeado

USAGE , READ

Formato de arquivo nomeado

USAGE

Necessário apenas se o estágio que você criou na Etapa 2: criar um estágio (se necessário) fizer referência a um formato de arquivo nomeado.

Banco de dados de destino

USAGE

Esquema de destino

USAGE

Tabela de destino

INSERT , SELECT

Use o comando GRANT <privilégios> para conceder privilégios à função.

Nota

Somente administradores de segurança (ou seja, usuários com a função SECURITYADMIN ou superior) podem criar funções.

Por exemplo, crie uma função chamada snowpipe_role que possa acessar um conjunto de objetos de banco de dados snowpipe_db.public, bem como um canal chamado mypipe; então, conceda a função a um usuário:

-- Create a role to contain the Snowpipe privileges
use role securityadmin;

create or replace role snowpipe_role;

-- Grant the required privileges on the database objects
grant usage on database snowpipe_db to role snowpipe_role;

grant usage on schema snowpipe_db.public to role snowpipe_role;

grant insert, select on snowpipe_db.public.mytable to role snowpipe_role;

grant usage, read on stage snowpipe_db.public.mystage to role snowpipe_role;

-- Pause the pipe for OWNERSHIP transfer
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = true;

-- Grant the OWNERSHIP privilege on the pipe object
grant ownership on pipe snowpipe_db.public.mypipe to role snowpipe_role;

-- Grant the role to a user
grant role snowpipe_role to user jsmith;

-- Set the role as the default role for the user
alter user jsmith set default_role = snowpipe_role;

-- Resume the pipe
alter pipe mypipe SET PIPE_EXECUTION_PAUSED = false;
Copy

O Snowpipe com ingestão automática está configurado!

Quando novos arquivos de dados são adicionados ao bucket S3, a notificação de evento informa o Snowpipe para carregá-los na tabela de destino definida no canal.

Etapa 5: carregar arquivos históricos

Para carregar qualquer lista de pendências de arquivos de dados que existiam no estágio externo antes que as notificações SQS fossem configuradas, consulte Carregamento de dados históricos.

Etapa 6: excluir arquivos preparados

Exclua os arquivos preparados depois de carregar os dados com sucesso e não precisar mais dos arquivos. Para obter instruções, consulte Eliminação de arquivos preparados depois que o Snowpipe carrega os dados.

Opção 3: Configuração do Amazon EventBridge para automatizar o Snowpipe

Semelhante à opção 2, você também pode configurar o Amazon EventBridge para automatizar o Snowpipe.

Etapa 1: Criar um tópico Amazon SNS

Siga Pré-requisito: Criar um tópico e assinatura do Amazon SNS e assinatura (neste tópico).

Etapa 2: Criar uma regra EventBridge para assinar buckets S3 e enviar notificações para o tópico SNS

Etapa 3: Configuração do Amazon SNS para automatizar o Snowpipe usando notificações SQS

Siga Opção 2: Configuração do Amazon SNS para automatizar o Snowpipe usando notificações SQS (neste tópico)

Saída SYSTEM$PIPE_STATUS

A função SYSTEM$PIPE_STATUS recupera uma representação JSON do status atual de um canal.

Para canais com AUTO_INGEST definido como TRUE, a função retorna um objeto JSON contendo os seguintes pares nome/valor (se aplicável ao status atual do canal):

{«executionState»:»<valor>»,»oldestFileTimestamp»:<valor>,»pendingFileCount»:<valor>,»notificationChannelName»:»<valor>»,»numOutstandingMessagesOnChannel»:<valor>,»lastReceivedMessageTimestamp»:»<valor>»,»lastForwardedMessageTimestamp»:»<valor>»,»error»:<valor>,»fault»:<valor>}

Para descrições dos valores de saída, consulte o tópico de referência para a função SQL.