CREATE PIPE

Cria um novo canal no sistema para definir a instrução COPY INTO <tabela> usada pelo Snowpipe para carregar dados em tabelas a partir de uma fila de ingestão.

Consulte também:

ALTER PIPE, DROP PIPE , SHOW PIPES , DESCRIBE PIPE

Sintaxe

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
  [ AUTO_INGEST = [ TRUE | FALSE ] ]
  [ ERROR_INTEGRATION = <integration_name> ]
  [ AWS_SNS_TOPIC = '<string>' ]
  [ INTEGRATION = '<string>' ]
  [ COMMENT = '<string_literal>' ]
  AS <copy_statement>
Copy

Parâmetros obrigatórios

name

Identificador para o canal; deve ser único para o esquema no qual o canal é criado.

O identificador deve começar com um caractere alfabético e não pode conter espaços ou caracteres especiais a menos que toda a cadeia de caracteres do identificador esteja entre aspas duplas (por exemplo, "My object"). Os identificadores delimitados por aspas duplas também diferenciam letras maiúsculas de minúsculas.

Para obter mais detalhes, consulte Requisitos para identificadores.

copy_statement

Instrução COPY INTO <tabela> usada para carregar dados de arquivos em fila em uma tabela do Snowflake. Esta instrução serve como texto/definição para o canal e é exibida na saída SHOW PIPES.

Nota

Atualmente não recomendamos o uso das seguintes funções no copy_statement para Snowpipe:

  • CURRENT_DATE

  • CURRENT_TIME

  • CURRENT_TIMESTAMP

  • GETDATE

  • LOCALTIME

  • LOCALTIMESTAMP

  • SYSDATE

  • SYSTIMESTAMP

É um problema conhecido que valores de tempo inseridos usando essas funções podem estar algumas horas antes dos valores LOAD_TIME retornados pela função COPY_HISTORY ou pela exibição COPY_HISTORY.

Recomenda-se consultar METADATA$START_SCAN_TIME em vez disso, o que fornece uma representação mais precisa do carregamento de registros.

Parâmetros opcionais

AUTO_INGEST = TRUE | FALSE

Especifica se deve carregar automaticamente arquivos de dados do estágio externo especificado e do caminho opcional quando notificações de eventos são recebidas de um serviço de mensagens configurado:

  • TRUE permite o carregamento automático de dados.

    O Snowpipe oferece suporte ao carregamento a partir de estágios externos (Amazon S3, Google Cloud Storage ou Microsoft Azure).

  • FALSE desativa o carregamento automático de dados. Você deve fazer chamadas para os pontos de extremidade do REST API do Snowpipe para carregar arquivos de dados.

    O Snowpipe oferece suporte ao carregamento a partir de estágios internos (ou seja, estágios nomeados do Snowflake ou estágios de tabela, mas não estágios de usuário) ou estágios externos (Amazon S3, Google Cloud Storage ou Microsoft Azure).

ERROR_INTEGRATION = 'integration_name'

Obrigatório apenas ao configurar o Snowpipe para enviar notificações de push para um serviço de mensagens em nuvem.

Especifica o nome da integração da notificação utilizada para se comunicar com o serviço de mensagens. Para obter mais informações, consulte Notificações de erro do Snowpipe.

AWS_SNS_TOPIC = 'string'

Obrigatório somente ao configurar AUTO_INGEST para os estágios do Amazon S3 usando SNS.

Especifica o Amazon Resource Name (ARN) para o tópico SNS do seu bucket S3. A instrução CREATE PIPE inscreve a fila do Amazon Simple Queue Service (SQS) no tópico SNS especificado. O canal copia arquivos para a fila de ingestão acionada por notificações de eventos pelo tópico SNS. Para obter mais informações, consulte Automação do Snowpipe para Amazon S3.

INTEGRATION = 'string'

Obrigatório somente ao configurar AUTO_INGEST para os estágios do Google Cloud Storage ou Microsoft Azure.

Especifica a integração de notificação existente utilizada para acessar a fila de armazenamento. Para obter mais informações, consulte:

O nome da integração deve ser digitado em letras maiúsculas.

COMMENT = 'string_literal'

Especifica um comentário para o canal.

Padrão: sem valor

Notas de uso

  • Este comando SQL requer as seguintes permissões mínimas:

    Privilégio

    Objeto

    Notas

    CREATE PIPE

    Esquema

    USAGE

    Estágio na definição do canal

    Somente estágios externos

    READ

    Estágio na definição do canal

    Somente estágios internos

    SELECT, INSERT

    Tabela na definição do canal

    Operações SQL sobre objetos de esquema também exigem o privilégio USAGE no banco de dados e esquema que contém o objeto.

  • Todas as opções de cópia COPY INTO <tabela> são suportadas, exceto para o seguinte:

    • FILES = ( 'file_name1' [ , 'file_name2', ... ] )

    • ON_ERROR = ABORT_STATEMENT

    • SIZE_LIMIT = num

    • PURGE = TRUE | FALSE (ou seja, purga automática durante o carregamento)

    • FORCE = TRUE | FALSE

      Observe que você pode remover manualmente arquivos de um estágio interno (ou seja, Snowflake) (após terem sido carregados) usando o comando REMOVE.

    • RETURN_FAILED_ONLY = TRUE | FALSE

    • VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS

  • A opção de cópia PATTERN = 'regex_pattern' filtra o conjunto de arquivos a serem carregados usando uma expressão regular. A correspondência de padrões se comporta da seguinte forma, dependendo do valor do parâmetro AUTO_INGEST:

    • AUTO_INGEST = TRUE: a expressão regular filtra a lista de arquivos no estágio e o caminho opcional (ou seja, local de armazenamento em nuvem) na instrução COPY INTO <tabela>.

    • :AUTO_INGEST = FALSE: a expressão regular filtra a lista de arquivos enviados em chamadas para o ponto de extremidade do REST API insertFiles do Snowpipe.

    Observe que o Snowpipe reduz qualquer segmento do caminho na definição do estágio a partir do local de armazenamento e aplica a expressão regular a quaisquer segmentos de caminho e nomes de arquivo restantes. Para ver a definição do estágio, execute o comando DESCRIBE STAGE para o estágio. A propriedade do URL consiste no nome do bucket ou contêiner e zero ou mais segmentos de caminho. Por exemplo, se o local FROM em uma instrução COPY INTO <tabela> for @s/path1/path2/ e o valor do URL para o estágio @s for s3://mybucket/path1/, o Snowpipe reduzirá o /path1/ do local de armazenamento na cláusula FROM e aplicará a expressão regular a path2/ mais os nomes dos arquivos no caminho.

    Importante

    O Snowflake recomenda que você habilite a filtragem de eventos da nuvem para o Snowpipe para reduzir custos, ruído de eventos e latência. Use a opção PATTERN somente quando o recurso de filtragem de eventos de seu provedor de nuvem não for suficiente. Para obter mais informações sobre a configuração da filtragem de eventos para cada provedor de nuvem, consulte as seguintes páginas:

  • É possível usar uma consulta como origem para a instrução COPY para reordenação de colunas, omissão de colunas e conversões (ou seja, transformação de dados durante um carregamento). Para exemplos de uso, consulte Transformação de dados durante um carregamento. Note que somente instruções simples SELECT são suportadas. A filtragem usando uma cláusula WHERE não é suportada.

  • As definições de canal não são dinâmicas (ou seja, um canal não é atualizado automaticamente se o estágio ou tabela subjacente mudar, como renomear ou descartar o estágio/a tabela). Em vez disso, você deve criar um novo canal e enviar este nome de canal em futuras chamadas de REST API do Snowpipe.

  • Em relação aos metadados:

    Atenção

    Os clientes devem garantir que nenhum dado pessoal (exceto para um objeto do usuário), dados sensíveis, dados controlados por exportação ou outros dados regulamentados sejam inseridos como metadados ao usar o serviço Snowflake. Para obter mais informações, consulte Campos de metadados no Snowflake.

  • Instruções CREATE OR REPLACE <object> são atômicas. Ou seja, quando um objeto é substituído, o objeto antigo é excluído e o novo objeto é criado em uma única transação.

Importante

Se você recriar um canal (usando a sintaxe CREATE OR REPLACE PIPE), consulte Recriação de canais para considerações relacionadas e práticas recomendadas.

Exemplos

Criar um canal no esquema atual que carrega todos os dados dos arquivos preparados no estágio mystage em mytable:

create pipe mypipe as copy into mytable from @mystage;
Copy

O mesmo que o exemplo anterior, mas com uma transformação de dados. Somente carregar os dados da quarta e quinta colunas nos arquivos preparados, em ordem inversa:

create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);
Copy

Criar um canal no esquema atual para carregamento automático de dados usando notificações de eventos recebidas de um serviço de mensagens:

Amazon S3

create pipe mypipe_s3
  auto_ingest = true
  aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

Google Cloud Storage

create pipe mypipe_gcs
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy

Microsoft Azure

create pipe mypipe_azure
  auto_ingest = true
  integration = 'MYINT'
  as
  copy into snowpipe_db.public.mytable
  from @snowpipe_db.public.mystage
  file_format = (type = 'JSON');
Copy