API REST Snowpipe

Você interage com um canal fazendo chamadas para pontos de extremidade REST. Este tópico descreve a API REST Snowpipe para definir a lista de arquivos a serem ingeridos e buscar relatórios do histórico de carregamento.

O Snowflake também fornece APIs Java e Python que simplificam o trabalho com a API REST Snowpipe.

Neste tópico:

Ingestão de arquivo de dados

A API Snowpipe fornece um ponto de extremidade REST para definir a lista de arquivos a serem ingeridos.

Ponto de extremidade: insertFiles

Informa o Snowflake sobre os arquivos a serem ingeridos em uma tabela. Uma resposta bem-sucedida a partir desse ponto de extremidade significa que o Snowflake registrou a lista de arquivos a serem adicionados à tabela. Isso não significa necessariamente que os arquivos tenham sido ingeridos. Para obter mais detalhes, consulte os códigos de resposta abaixo.

Na maioria dos casos, o Snowflake insere novos dados na tabela de destino dentro de poucos minutos.

Método: POST

POST URL:

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

Parâmetros de URL:

  • account (obrigatório): identificador de conta para sua conta Snowflake.

  • pipeName (obrigatório): nome do canal totalmente qualificado e que distingue maiúsculas e minúsculas. Por exemplo, myDatabase.mySchema.myPipe.

  • requestId (opcional): cadeia de caracteres usada para rastrear solicitações no sistema. Recomendamos que você forneça uma cadeia de caracteres aleatória em cada solicitação, por exemplo, um UUID. Isso deve ser anexado ao URL da seguinte forma: ?requestId=<your_uuid>.

Cabeçalhos de solicitação

  • Content-Type::

    • text/plain: para uma lista de texto simples de caminhos e nomes de arquivos, um por linha. O parâmetro size não é permitido nesse formato.

    • application/json: para um objeto JSON que contém uma lista de arquivos com informações de tamanho opcionais.

  • Authorization: BEARER <jwt_token>

Corpo da solicitação (para o Content-Type application/json)

O corpo da solicitação deve ser um objeto JSON com uma única chave chamada “files”. O valor associado a essa chave é uma matriz de objetos JSON, em que cada objeto representa um arquivo a ser ingerido.

{
  "files":[
    {
      "path":"filePath/file1.csv",
      "size":100
    },
    {
      "path":"filePath/file2.csv",
      "size":100
    }
   ]
}
Copy

Cada elemento da matriz “files” é um objeto JSON com os seguintes atributos:

  • path (obrigatório): o caminho e o nome de arquivo do arquivo preparado. Se você seguir nossas práticas recomendadas ao particionar seus dados no estágio usando caminhos lógicos e granulares, os valores de caminho na carga útil incluirão os caminhos completos para os arquivos preparados.

  • size (opcional, mas recomendado para melhorar o desempenho): o tamanho do arquivo em bytes.

Corpo da solicitação (para o Content-Type text/plain)

O corpo da solicitação deve ser uma lista de texto simples de caminhos e nomes de arquivos, com uma entrada por linha.

filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt
Copy

Nota

A postagem pode conter, no máximo, 5.000 arquivos. Cada caminho de arquivo dado deve ser <= 1024 bytes longos quando serializado como UTF-8.

Corpo da resposta

Códigos de resposta:

  • 200 — Sucesso. Arquivos adicionados à fila de arquivos a serem ingeridos.

  • 400 — Falha. Solicitação inválida devido a um formato inválido ou limite excedido.

  • 404 — Falha. pipeName não reconhecido.

    Esse código de erro também pode ser retornado se a função utilizada ao chamar o ponto de extremidade não tiver privilégios suficientes. Para obter mais informações, consulte Concessão de privilégios de acesso.

  • 429 — Falha. O limite de taxa de solicitação foi excedido.

  • 500 — Falha. Ocorreu um erro interno.

Carga útil de resposta:

Com uma solicitação API bem-sucedida (ou seja, código 200), a carga útil de resposta contém os elementos requestId e status no formato JSON. Se ocorrer um erro, a carga útil de resposta pode conter detalhes sobre o erro.

{
  "requestId": "your_request_uuid",
  "status": "success"
}
Copy

Se a instrução COPY INTO <tabela> na definição do canal incluir a opção de cópia PATTERN, o atributo unmatchedPatternFiles lista todos os arquivos enviados no cabeçalho que não correspondem à expressão regular e que, portanto, foram ignorados.

{
  "requestId": "your_request_uuid",
  "status": "success",
  "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"]
}
Copy

Relatórios do histórico de carregamento

A API Snowpipe fornece pontos de extremidade REST para a obtenção de relatórios de carregamento.

Ponto de extremidade: insertReport

Recupera um relatório de arquivos enviados via insertFiles, cujo conteúdo foi recentemente ingerido em uma tabela. Note que, para arquivos grandes, isso pode ser apenas parte do arquivo.

Observe as seguintes limitações desse ponto de extremidade:

  • Os 10.000 eventos mais recentes são mantidos.

  • Os eventos são retidos por um período máximo de 10 minutos.

Um evento ocorre quando os dados de um arquivo enviado via insertFiles foram enviados para a tabela e estão disponíveis para consultas. O ponto de extremidade insertReport pode ser pensado como comando tail do UNIX. Ao chamar esse comando repetidamente, é possível ver o histórico completo dos eventos em um canal ao longo do tempo. Note que o comando deve ser chamado com frequência suficiente para não perder os eventos. A frequência depende da taxa de envio dos arquivos para insertFiles.

Método: GET

GET URL:

https://<identificador_de_conta>.snowflakecomputing.com/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>

Parâmetros de URL:

  • account_identifier (obrigatório): seu identificador de conta Snowflake exclusivo. O formato preferido é organization_name-account_name. Para formatos alternativos (localizador de contas com região e plataforma de nuvem), consulte Formato 1 (preferido): Nome da conta em sua organização.

  • pipeName (obrigatório): o nome totalmente qualificado e sensível a maiúsculas e minúsculas do Snowpipe. Por exemplo, myDatabase.mySchema.myPipe.

  • requestId (opcional): uma cadeia de caracteres que você pode fornecer para rastrear essa solicitação específica no sistema da Snowflake. O uso de uma cadeia de caracteres aleatória como UUID é altamente recomendado para facilitar a depuração e o monitoramento. Acrescente isso ao URL da seguinte forma: ?requestId=<your_uuid>.

  • beginMark (opcional): um valor de marcador retornado no campo nextBeginMark de uma resposta insertReport anterior. A inclusão desse marcador ajuda a otimizar as chamadas subsequentes, reduzindo potencialmente o número de eventos duplicados retornados. Observação: embora beginMark seja uma dica para evitar duplicatas, ainda pode ocorrer uma repetição ocasional de eventos. Se beginMark não for especificado, o relatório mostrará o histórico de ingestão dos últimos 10 minutos. Acrescente isso ao URL da seguinte forma: ?beginMark=<previous_nextBeginMark>.

Cabeçalhos de solicitação:

  • Aceitar: especifica o formato de resposta desejado. Os valores aceitos são text/plain ou application/json.

  • Autorização: seu token de autenticação Snowflake. Use o formato BEARER <jwt_token>.

Corpo da solicitação:

Esse ponto de extremidade não aceita um corpo de solicitação para solicitações GET. Os parâmetros necessários são fornecidos no URL e nos cabeçalhos.

Corpo da resposta:

Códigos de resposta:

  • 200 — Sucesso. Relatório retornado.

  • 400 — Falha. Solicitação inválida devido a um formato inválido ou limite excedido.

  • 404 — Falha. pipeName não reconhecido.

    Esse código de erro também pode ser retornado se a função utilizada ao chamar o ponto de extremidade não tiver privilégios suficientes. Para obter mais informações, consulte Concessão de privilégios de acesso.

  • 429 — Falha. O limite de taxa de solicitação foi excedido.

  • 500 — Falha. Ocorreu um erro interno.

Carga útil de resposta:

Uma resposta de sucesso (200) contém informações sobre arquivos que foram adicionados recentemente à tabela. Observe que esse relatório pode representar apenas uma parte de um grande arquivo.

Por exemplo:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "nextBeginMark": "1_39",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mybucket/",
      "fileSize": 57,
      "timeReceived": "2017-06-21T04:47:41.453Z",
      "lastInsertTime": "2017-06-21T04:48:28.575Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

Campos de resposta:

Campo

Tipo

Descrição

pipe

Cadeia de caracteres

O nome totalmente qualificado do canal.

completeResult

Booliano

false se um evento foi perdido entre o evento fornecido beginMark e o primeiro evento neste histórico de relatórios. Caso contrário, true.

nextBeginMark

Cadeia de caracteres

beginMark para usar na próxima solicitação para evitar ver registros duplicados. Observe que esse valor é uma dica. Duplicatas ainda podem ocorrer ocasionalmente.

files

Matriz

Uma matriz de objetos JSON, um objeto para cada arquivo que faz parte da resposta do histórico.

path

Cadeia de caracteres

O caminho do arquivo relativo à localização do estágio.

stageLocation

Cadeia de caracteres

Ou a ID do estágio (estágio interno) ou o bucket S3 (estágio externo) definido no canal.

fileSize

Long

Tamanho do arquivo, em bytes.

timeReceived

Cadeia de caracteres

Hora em que esse arquivo foi recebido para processamento. O formato é ISO-8601 no fuso horário do UTC.

lastInsertTime

Cadeia de caracteres

Hora em que os dados desse arquivo foram inseridos pela última vez na tabela. O formato é ISO-8601 no fuso horário do UTC.

rowsInserted

Long

Número de linhas inseridas na tabela de destino a partir do arquivo.

rowsParsed

Long

Número de linhas analisadas a partir do arquivo. Linhas com erros podem ser ignoradas.

errorsSeen

Inteiro

Número de erros encontrados no arquivo

errorLimit

Inteiro

Número de erros permitidos no arquivo antes de ser considerado falha (com base na opção de cópia ON_ERROR).

firstError [1]

Cadeia de caracteres

Mensagem de erro referente ao primeiro erro encontrado nesse arquivo.

firstErrorLineNum [1]

Long

Número da linha do primeiro erro.

firstErrorCharacterPos [1]

Long

Posição do caractere do primeiro erro.

firstErrorColumnName [1]

Cadeia de caracteres

Nome da coluna onde ocorreu o primeiro erro.

systemError [1]

Cadeia de caracteres

Erro geral que descreve o motivo pelo qual o arquivo não foi processado.

complete

Booliano

Indica se o arquivo foi completamente processado com sucesso.

status

Cadeia de caracteres

Status de carregamento do arquivo:

  • LOAD_IN_PROGRESS: Parte do arquivo foi carregada na tabela, mas o processo de carregamento ainda não foi concluído.

  • LOADED: O arquivo inteiro foi carregado na tabela.

  • LOAD_FAILED: O carregamento do arquivo falhou.

  • PARTIALLY_LOADED: Algumas linhas deste arquivo foram carregadas com sucesso, mas outras não foram carregadas devido a erros. O processamento deste arquivo está concluído.

[1] Os valores só são fornecidos para esses campos quando os arquivos incluem erros.

Ponto de extremidade: loadHistoryScan

Busca um relatório sobre arquivos ingeridos, cujo conteúdo foi adicionado à tabela. Note que, para arquivos grandes, isso pode ser apenas parte do arquivo. Esse ponto de extremidade difere de insertReport na medida em que vê o histórico entre dois pontos no tempo. Há um máximo de 10.000 itens retornados, mas várias chamadas podem ser emitidas para cobrir o intervalo desejado.

Importante

Esse ponto de extremidade é limitado para evitar chamadas excessivas. Para ajudar a evitar exceder o limite de taxa (código de erro 429), recomendamos contar mais fortemente com insertReport do que com loadHistoryScan. Ao chamar loadHistoryScan, especifique o menor intervalo que inclui um conjunto de carregamentos de dados. Por exemplo, ler os últimos 10 minutos de histórico a cada 8 minutos funcionaria bem. Tentar ler as últimas 24 horas do histórico a cada minuto resultará em 429 erros indicando que um limite de taxa foi atingido. Os limites de taxa são projetados para permitir que cada registro de histórico seja lido várias vezes.

Para uma exibição mais abrangente, sem esses limites, o Snowflake oferece uma função de tabela do Information Schema, COPY_HISTORY, que retorna o histórico de carregamento de um canal ou tabela.

Método: GET

GET URL:

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>

Parâmetros de URL:

  • account (obrigatório): seu identificador de conta Snowflake exclusivo.

  • pipeName (obrigatório): o nome totalmente qualificado e sensível a maiúsculas e minúsculas do Snowpipe. Exemplo: myDatabase.mySchema.myPipe.

  • startTimeInclusive (obrigatório): o início do intervalo de tempo para recuperar os dados do histórico de carga, especificado como um carimbo de data/hora no formato ISO-8601 (por exemplo, 2023-10-26T10:00:00Z). Esse carimbo de data/hora marca o limite inferior inclusivo da consulta.

  • endTimeExclusive (opcional): o fim do intervalo de tempo para recuperar dados do histórico de carga, especificado como um carimbo de data/hora no formato ISO-8601 (por exemplo, 2023-10-26T10:15:00Z). Esse carimbo de data/hora marca o limite superior exclusivo da consulta. Se esse parâmetro for omitido, o carimbo de data/hora do servidor atual (CURRENT_TIMESTAMP()) será usado como o fim do intervalo de tempo.

  • requestId (opcional): uma cadeia de caracteres que você pode fornecer para rastrear essa solicitação específica no sistema da Snowflake. Recomendamos o uso de uma cadeia de caracteres aleatória, como UUID, para facilitar a depuração e o monitoramento. Acrescente isso ao URL da seguinte forma: ?requestId=<your_uuid>.

Cabeçalhos de solicitação:

  • Accept: especifica o formato de resposta desejado. Os valores aceitos são text/plain ou application/json.

  • Authorization: seu token de autenticação Snowflake. Use o formato BEARER <jwt_token>.

Corpo da solicitação:

Esse ponto de extremidade não aceita um corpo de solicitação para solicitações GET. Todos os parâmetros necessários são fornecidos no URL e nos cabeçalhos.

Corpo da resposta:

Códigos de resposta:

  • 200 — Sucesso. Os resultados do escaneamento do histórico de carregamento são retornados.

  • 400 — Falha. Solicitação inválida devido a um formato inválido ou limite excedido.

  • 404 — Falha. pipeName não reconhecido.

  • 429 — Falha. O limite de taxa de solicitação foi excedido.

  • 500 — Falha. Ocorreu um erro interno.

Carga útil de resposta:

Uma resposta de sucesso (200) contém informações sobre arquivos que foram adicionados recentemente à tabela. Observe que esse relatório pode representar apenas uma parte de um grande arquivo.

Por exemplo:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "startTimeInclusive": "2017-08-25T18:42:31.081Z",
  "endTimeExclusive":"2017-08-25T22:43:45.552Z",
  "rangeStartTime":"2017-08-25T22:43:45.383Z",
  "rangeEndTime":"2017-08-25T22:43:45.383Z",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mystage/",
      "fileSize": 57,
      "timeReceived": "2017-08-25T22:43:45.383Z",
      "lastInsertTime": "2017-08-25T22:43:45.383Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

Campos de resposta:

Campo

Tipo

Descrição

pipe

Cadeia de caracteres

Nome totalmente qualificado do canal.

completeResult

Booliano

false se o relatório estiver incompleto (ou seja, o número de entradas no intervalo especificado exceder o limite de 10.000 entradas). Se false, o usuário pode especifique o valor atual rangeEndTime como o valor startTimeInclusive para a próxima solicitação para prosseguir para o próximo conjunto de entradas.

startTimeInclusive

Cadeia de caracteres

Carimbo de data/hora inicial (no formato ISO 8601) fornecido na solicitação.

endTimeExclusive

Cadeia de caracteres

Carimbo de data/hora final (no formato ISO 8601) fornecido na solicitação.

rangeStartTime

Cadeia de caracteres

Carimbo de data/hora (no formato ISO 8601) da entrada mais antiga dos arquivos incluídos na resposta.

rangeEndTime

Cadeia de caracteres

Carimbo de data/hora (no formato ISO 8601) da entrada mais recente dos arquivos incluídos na resposta.

files

Matriz

Uma matriz de objetos JSON, um objeto para cada arquivo que faz parte da resposta do histórico. Dentro da matriz, os campos de resposta são os mesmos que os retornados na resposta insertReport.