CREATE PIPE¶
Cria um novo canal no sistema para definir a instrução COPY INTO <tabela> usada pelo Snowpipe para carregar dados em tabelas a partir de uma fila de ingestão.
- Consulte também:
Sintaxe¶
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] <name>
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ ERROR_INTEGRATION = <integration_name> ]
[ AWS_SNS_TOPIC = '<string>' ]
[ INTEGRATION = '<string>' ]
[ COMMENT = '<string_literal>' ]
AS <copy_statement>
Parâmetros obrigatórios¶
name
Identificador para o canal; deve ser único para o esquema no qual o canal é criado.
O identificador deve começar com um caractere alfabético e não pode conter espaços ou caracteres especiais a menos que toda a cadeia de caracteres do identificador esteja entre aspas duplas (por exemplo,
"My object"
). Os identificadores delimitados por aspas duplas também diferenciam letras maiúsculas de minúsculas.Para obter mais detalhes, consulte Requisitos para identificadores.
copy_statement
Instrução COPY INTO <tabela> usada para carregar dados de arquivos em fila em uma tabela do Snowflake. Esta instrução serve como texto/definição para o canal e é exibida na saída SHOW PIPES.
Nota
Atualmente não recomendamos o uso das seguintes funções no copy_statement
para Snowpipe:
CURRENT_DATE
CURRENT_TIME
CURRENT_TIMESTAMP
GETDATE
LOCALTIME
LOCALTIMESTAMP
SYSDATE
SYSTIMESTAMP
É um problema conhecido que valores de tempo inseridos usando essas funções podem estar algumas horas antes dos valores LOAD_TIME retornados pela função COPY_HISTORY ou pela exibição COPY_HISTORY.
Recomenda-se consultar METADATA$START_SCAN_TIME em vez disso, o que fornece uma representação mais precisa do carregamento de registros.
Parâmetros opcionais¶
AUTO_INGEST = TRUE | FALSE
Especifica se deve carregar automaticamente arquivos de dados do estágio externo especificado e do caminho opcional quando notificações de eventos são recebidas de um serviço de mensagens configurado:
TRUE
permite o carregamento automático de dados.O Snowpipe oferece suporte ao carregamento a partir de estágios externos (Amazon S3, Google Cloud Storage ou Microsoft Azure).
FALSE
desativa o carregamento automático de dados. Você deve fazer chamadas para os pontos de extremidade do REST API do Snowpipe para carregar arquivos de dados.O Snowpipe oferece suporte ao carregamento a partir de estágios internos (ou seja, estágios nomeados do Snowflake ou estágios de tabela, mas não estágios de usuário) ou estágios externos (Amazon S3, Google Cloud Storage ou Microsoft Azure).
ERROR_INTEGRATION = 'integration_name'
Obrigatório apenas ao configurar o Snowpipe para enviar notificações de push para um serviço de mensagens em nuvem.
Especifica o nome da integração da notificação utilizada para se comunicar com o serviço de mensagens. Para obter mais informações, consulte Notificações de erro do Snowpipe.
AWS_SNS_TOPIC = 'string'
Obrigatório somente ao configurar AUTO_INGEST para os estágios do Amazon S3 usando SNS.
Especifica o Amazon Resource Name (ARN) para o tópico SNS do seu bucket S3. A instrução CREATE PIPE inscreve a fila do Amazon Simple Queue Service (SQS) no tópico SNS especificado. O canal copia arquivos para a fila de ingestão acionada por notificações de eventos pelo tópico SNS. Para obter mais informações, consulte Automação do Snowpipe para Amazon S3.
INTEGRATION = 'string'
Obrigatório somente ao configurar AUTO_INGEST para os estágios do Google Cloud Storage ou Microsoft Azure.
Especifica a integração de notificação existente utilizada para acessar a fila de armazenamento. Para obter mais informações, consulte:
O nome da integração deve ser digitado em letras maiúsculas.
COMMENT = 'string_literal'
Especifica um comentário para o canal.
Padrão: sem valor
Notas de uso¶
Este comando SQL requer as seguintes permissões mínimas:
Privilégio
Objeto
Notas
CREATE PIPE
Esquema
USAGE
Estágio na definição do canal
Somente estágios externos
READ
Estágio na definição do canal
Somente estágios internos
SELECT, INSERT
Tabela na definição do canal
Operações SQL sobre objetos de esquema também exigem o privilégio USAGE no banco de dados e esquema que contém o objeto.
Todas as opções de cópia COPY INTO <tabela> são suportadas, exceto para o seguinte:
FILES = ( 'file_name1' [ , 'file_name2', ... ] )
ON_ERROR = ABORT_STATEMENT
SIZE_LIMIT = num
PURGE = TRUE | FALSE
(ou seja, purga automática durante o carregamento)FORCE = TRUE | FALSE
Observe que você pode remover manualmente arquivos de um estágio interno (ou seja, Snowflake) (após terem sido carregados) usando o comando REMOVE.
RETURN_FAILED_ONLY = TRUE | FALSE
VALIDATION_MODE = RETURN_n_ROWS | RETURN_ERRORS | RETURN_ALL_ERRORS
A opção de cópia
PATTERN = 'regex_pattern'
filtra o conjunto de arquivos a serem carregados usando uma expressão regular. A correspondência de padrões se comporta da seguinte forma, dependendo do valor do parâmetro AUTO_INGEST:AUTO_INGEST = TRUE
: a expressão regular filtra a lista de arquivos no estágio e o caminho opcional (ou seja, local de armazenamento em nuvem) na instrução COPY INTO <tabela>.:AUTO_INGEST = FALSE
: a expressão regular filtra a lista de arquivos enviados em chamadas para o ponto de extremidade do REST APIinsertFiles
do Snowpipe.
Observe que o Snowpipe reduz qualquer segmento do caminho na definição do estágio a partir do local de armazenamento e aplica a expressão regular a quaisquer segmentos de caminho e nomes de arquivo restantes. Para ver a definição do estágio, execute o comando DESCRIBE STAGE para o estágio. A propriedade do URL consiste no nome do bucket ou contêiner e zero ou mais segmentos de caminho. Por exemplo, se o local FROM em uma instrução COPY INTO <tabela> for
@s/path1/path2/
e o valor do URL para o estágio@s
fors3://mybucket/path1/
, o Snowpipe reduzirá o/path1/
do local de armazenamento na cláusula FROM e aplicará a expressão regular apath2/
mais os nomes dos arquivos no caminho.Importante
O Snowflake recomenda que você habilite a filtragem de eventos da nuvem para o Snowpipe para reduzir custos, ruído de eventos e latência. Use a opção PATTERN somente quando o recurso de filtragem de eventos de seu provedor de nuvem não for suficiente. Para obter mais informações sobre a configuração da filtragem de eventos para cada provedor de nuvem, consulte as seguintes páginas:
Amazon S3: Configuração de notificações de eventos usando filtragem de nomes de chave de objeto
Microsoft Azure Event Grid: Understand event filtering for Event Grid subscriptions
Google Cloud Pub/Sub: Filtrar mensagens
É possível usar uma consulta como origem para a instrução COPY para reordenação de colunas, omissão de colunas e conversões (ou seja, transformação de dados durante um carregamento). Para exemplos de uso, consulte Transformação de dados durante um carregamento. Note que somente instruções simples SELECT são suportadas. A filtragem usando uma cláusula WHERE não é suportada.
As definições de canal não são dinâmicas (ou seja, um canal não é atualizado automaticamente se o estágio ou tabela subjacente mudar, como renomear ou descartar o estágio/a tabela). Em vez disso, você deve criar um novo canal e enviar este nome de canal em futuras chamadas de REST API do Snowpipe.
Em relação aos metadados:
Atenção
Os clientes devem garantir que nenhum dado pessoal (exceto para um objeto do usuário), dados sensíveis, dados controlados por exportação ou outros dados regulamentados sejam inseridos como metadados ao usar o serviço Snowflake. Para obter mais informações, consulte Campos de metadados no Snowflake.
Instruções CREATE OR REPLACE <object> são atômicas. Ou seja, quando um objeto é substituído, o objeto antigo é excluído e o novo objeto é criado em uma única transação.
Importante
Se você recriar um canal (usando a sintaxe CREATE OR REPLACE PIPE), consulte Recriação de canais para considerações relacionadas e práticas recomendadas.
Exemplos¶
Criar um canal no esquema atual que carrega todos os dados dos arquivos preparados no estágio mystage
em mytable
:
create pipe mypipe as copy into mytable from @mystage;
O mesmo que o exemplo anterior, mas com uma transformação de dados. Somente carregar os dados da quarta e quinta colunas nos arquivos preparados, em ordem inversa:
create pipe mypipe2 as copy into mytable(C1, C2) from (select $5, $4 from @mystage);
Criar um canal no esquema atual para carregamento automático de dados usando notificações de eventos recebidas de um serviço de mensagens:
Amazon S3
create pipe mypipe_s3 auto_ingest = true aws_sns_topic = 'arn:aws:sns:us-west-2:001234567890:s3_mybucket' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');Google Cloud Storage
create pipe mypipe_gcs auto_ingest = true integration = 'MYINT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');Microsoft Azure
create pipe mypipe_azure auto_ingest = true integration = 'MYINT' as copy into snowpipe_db.public.mytable from @snowpipe_db.public.mystage file_format = (type = 'JSON');