API REST Snowpipe¶
Você interage com um canal fazendo chamadas para pontos de extremidade REST. Este tópico descreve a API REST Snowpipe para definir a lista de arquivos a serem ingeridos e buscar relatórios do histórico de carregamento.
O Snowflake também fornece APIs Java e Python que simplificam o trabalho com a API REST Snowpipe.
Neste tópico:
Ingestão de arquivo de dados¶
A API Snowpipe fornece um ponto de extremidade REST para definir a lista de arquivos a serem ingeridos.
Ponto de extremidade: insertFiles¶
Informa o Snowflake sobre os arquivos a serem ingeridos em uma tabela. Uma resposta bem-sucedida a partir desse ponto de extremidade significa que o Snowflake registrou a lista de arquivos a serem adicionados à tabela. Isso não significa necessariamente que os arquivos tenham sido ingeridos. Para obter mais detalhes, consulte os códigos de resposta abaixo.
Na maioria dos casos, o Snowflake insere novos dados na tabela de destino dentro de poucos minutos.
Método: POST
POST URL:
https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}
Parâmetros de URL:
account(obrigatório): identificador de conta para sua conta Snowflake.pipeName(obrigatório): nome do canal totalmente qualificado e que distingue maiúsculas e minúsculas. Por exemplo,myDatabase.mySchema.myPipe.requestId(opcional): cadeia de caracteres usada para rastrear solicitações no sistema. Recomendamos que você forneça uma cadeia de caracteres aleatória em cada solicitação, por exemplo, um UUID. Isso deve ser anexado ao URL da seguinte forma:?requestId=<your_uuid>.
Cabeçalhos de solicitação
Content-Type::text/plain: para uma lista de texto simples de caminhos e nomes de arquivos, um por linha. O parâmetro size não é permitido nesse formato.application/json: para um objeto JSON que contém uma lista de arquivos com informações de tamanho opcionais.
Authorization:BEARER <jwt_token>
Corpo da solicitação (para o Content-Type application/json)
O corpo da solicitação deve ser um objeto JSON com uma única chave chamada “files”. O valor associado a essa chave é uma matriz de objetos JSON, em que cada objeto representa um arquivo a ser ingerido.
{
"files":[
{
"path":"filePath/file1.csv",
"size":100
},
{
"path":"filePath/file2.csv",
"size":100
}
]
}
Cada elemento da matriz “files” é um objeto JSON com os seguintes atributos:
path(obrigatório): o caminho e o nome de arquivo do arquivo preparado. Se você seguir nossas práticas recomendadas ao particionar seus dados no estágio usando caminhos lógicos e granulares, os valores de caminho na carga útil incluirão os caminhos completos para os arquivos preparados.size(opcional, mas recomendado para melhorar o desempenho): o tamanho do arquivo em bytes.
Corpo da solicitação (para o Content-Type text/plain)
O corpo da solicitação deve ser uma lista de texto simples de caminhos e nomes de arquivos, com uma entrada por linha.
filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt
Nota
A postagem pode conter, no máximo, 5.000 arquivos. Cada caminho de arquivo dado deve ser <= 1024 bytes longos quando serializado como UTF-8.
Corpo da resposta
Códigos de resposta:
200 — Sucesso. Arquivos adicionados à fila de arquivos a serem ingeridos.
400 — Falha. Solicitação inválida devido a um formato inválido ou limite excedido.
404 — Falha.
pipeNamenão reconhecido.Esse código de erro também pode ser retornado se a função utilizada ao chamar o ponto de extremidade não tiver privilégios suficientes. Para obter mais informações, consulte Concessão de privilégios de acesso.
429 — Falha. O limite de taxa de solicitação foi excedido.
500 — Falha. Ocorreu um erro interno.
Carga útil de resposta:
Com uma solicitação API bem-sucedida (ou seja, código 200), a carga útil de resposta contém os elementos
requestIdestatusno formato JSON. Se ocorrer um erro, a carga útil de resposta pode conter detalhes sobre o erro.{ "requestId": "your_request_uuid", "status": "success" }Se a instrução COPY INTO <tabela> na definição do canal incluir a opção de cópia PATTERN, o atributo
unmatchedPatternFileslista todos os arquivos enviados no cabeçalho que não correspondem à expressão regular e que, portanto, foram ignorados.{ "requestId": "your_request_uuid", "status": "success", "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"] }
Relatórios do histórico de carregamento¶
A API Snowpipe fornece pontos de extremidade REST para a obtenção de relatórios de carregamento.
Ponto de extremidade: insertReport¶
Recupera um relatório de arquivos enviados via insertFiles, cujo conteúdo foi recentemente ingerido em uma tabela. Note que, para arquivos grandes, isso pode ser apenas parte do arquivo.
Observe as seguintes limitações desse ponto de extremidade:
Os 10.000 eventos mais recentes são mantidos.
Os eventos são retidos por um período máximo de 10 minutos.
Um evento ocorre quando os dados de um arquivo enviado via insertFiles foram enviados para a tabela e estão disponíveis para consultas. O ponto de extremidade insertReport pode ser pensado como comando tail do UNIX. Ao chamar esse comando repetidamente, é possível ver o histórico completo dos eventos em um canal ao longo do tempo. Note que o comando deve ser chamado com frequência suficiente para não perder os eventos. A frequência depende da taxa de envio dos arquivos para insertFiles.
Método: GET
GET URL:
https://<identificador_de_conta>.snowflakecomputing.com/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>
Parâmetros de URL:
account_identifier(obrigatório): seu identificador de conta Snowflake exclusivo. O formato preferido éorganization_name-account_name. Para formatos alternativos (localizador de contas com região e plataforma de nuvem), consulte Formato 1 (preferido): Nome da conta em sua organização.pipeName(obrigatório): o nome totalmente qualificado e sensível a maiúsculas e minúsculas do Snowpipe. Por exemplo,myDatabase.mySchema.myPipe.requestId(opcional): uma cadeia de caracteres que você pode fornecer para rastrear essa solicitação específica no sistema da Snowflake. O uso de uma cadeia de caracteres aleatória como UUID é altamente recomendado para facilitar a depuração e o monitoramento. Acrescente isso ao URL da seguinte forma:?requestId=<your_uuid>.beginMark(opcional): um valor de marcador retornado no camponextBeginMarkde uma respostainsertReportanterior. A inclusão desse marcador ajuda a otimizar as chamadas subsequentes, reduzindo potencialmente o número de eventos duplicados retornados. Observação: emborabeginMarkseja uma dica para evitar duplicatas, ainda pode ocorrer uma repetição ocasional de eventos. SebeginMarknão for especificado, o relatório mostrará o histórico de ingestão dos últimos 10 minutos. Acrescente isso ao URL da seguinte forma:?beginMark=<previous_nextBeginMark>.
Cabeçalhos de solicitação:
Aceitar: especifica o formato de resposta desejado. Os valores aceitos são
text/plainouapplication/json.Autorização: seu token de autenticação Snowflake. Use o formato BEARER <jwt_token>.
Corpo da solicitação:
Esse ponto de extremidade não aceita um corpo de solicitação para solicitações GET. Os parâmetros necessários são fornecidos no URL e nos cabeçalhos.
Corpo da resposta:
Códigos de resposta:
200 — Sucesso. Relatório retornado.
400 — Falha. Solicitação inválida devido a um formato inválido ou limite excedido.
404 — Falha.
pipeNamenão reconhecido.Esse código de erro também pode ser retornado se a função utilizada ao chamar o ponto de extremidade não tiver privilégios suficientes. Para obter mais informações, consulte Concessão de privilégios de acesso.
429 — Falha. O limite de taxa de solicitação foi excedido.
500 — Falha. Ocorreu um erro interno.
Carga útil de resposta:
Uma resposta de sucesso (200) contém informações sobre arquivos que foram adicionados recentemente à tabela. Observe que esse relatório pode representar apenas uma parte de um grande arquivo.
Por exemplo:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "nextBeginMark": "1_39", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mybucket/", "fileSize": 57, "timeReceived": "2017-06-21T04:47:41.453Z", "lastInsertTime": "2017-06-21T04:48:28.575Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }Campos de resposta:
Campo
Tipo
Descrição
pipeCadeia de caracteres
O nome totalmente qualificado do canal.
completeResultBooliano
falsese um evento foi perdido entre o evento fornecidobeginMarke o primeiro evento neste histórico de relatórios. Caso contrário,true.
nextBeginMarkCadeia de caracteres
beginMarkpara usar na próxima solicitação para evitar ver registros duplicados. Observe que esse valor é uma dica. Duplicatas ainda podem ocorrer ocasionalmente.
filesMatriz
Uma matriz de objetos JSON, um objeto para cada arquivo que faz parte da resposta do histórico.
pathCadeia de caracteres
O caminho do arquivo relativo à localização do estágio.
stageLocationCadeia de caracteres
Ou a ID do estágio (estágio interno) ou o bucket S3 (estágio externo) definido no canal.
fileSizeLong
Tamanho do arquivo, em bytes.
timeReceivedCadeia de caracteres
Hora em que esse arquivo foi recebido para processamento. O formato é ISO-8601 no fuso horário do UTC.
lastInsertTimeCadeia de caracteres
Hora em que os dados desse arquivo foram inseridos pela última vez na tabela. O formato é ISO-8601 no fuso horário do UTC.
rowsInsertedLong
Número de linhas inseridas na tabela de destino a partir do arquivo.
rowsParsedLong
Número de linhas analisadas a partir do arquivo. Linhas com erros podem ser ignoradas.
errorsSeenInteiro
Número de erros encontrados no arquivo
errorLimitInteiro
Número de erros permitidos no arquivo antes de ser considerado falha (com base na opção de cópia ON_ERROR).
firstError[1]Cadeia de caracteres
Mensagem de erro referente ao primeiro erro encontrado nesse arquivo.
firstErrorLineNum[1]Long
Número da linha do primeiro erro.
firstErrorCharacterPos[1]Long
Posição do caractere do primeiro erro.
firstErrorColumnName[1]Cadeia de caracteres
Nome da coluna onde ocorreu o primeiro erro.
systemError[1]Cadeia de caracteres
Erro geral que descreve o motivo pelo qual o arquivo não foi processado.
completeBooliano
Indica se o arquivo foi completamente processado com sucesso.
statusCadeia de caracteres
Status de carregamento do arquivo:
LOAD_IN_PROGRESS: Parte do arquivo foi carregada na tabela, mas o processo de carregamento ainda não foi concluído.
LOADED: O arquivo inteiro foi carregado na tabela.
LOAD_FAILED: O carregamento do arquivo falhou.
PARTIALLY_LOADED: Algumas linhas deste arquivo foram carregadas com sucesso, mas outras não foram carregadas devido a erros. O processamento deste arquivo está concluído.
[1] Os valores só são fornecidos para esses campos quando os arquivos incluem erros.
Ponto de extremidade: loadHistoryScan¶
Busca um relatório sobre arquivos ingeridos, cujo conteúdo foi adicionado à tabela. Note que, para arquivos grandes, isso pode ser apenas parte do arquivo. Esse ponto de extremidade difere de insertReport na medida em que vê o histórico entre dois pontos no tempo. Há um máximo de 10.000 itens retornados, mas várias chamadas podem ser emitidas para cobrir o intervalo desejado.
Importante
Esse ponto de extremidade é limitado para evitar chamadas excessivas. Para ajudar a evitar exceder o limite de taxa (código de erro 429), recomendamos contar mais fortemente com insertReport do que com loadHistoryScan. Ao chamar loadHistoryScan, especifique o menor intervalo que inclui um conjunto de carregamentos de dados. Por exemplo, ler os últimos 10 minutos de histórico a cada 8 minutos funcionaria bem. Tentar ler as últimas 24 horas do histórico a cada minuto resultará em 429 erros indicando que um limite de taxa foi atingido. Os limites de taxa são projetados para permitir que cada registro de histórico seja lido várias vezes.
Para uma exibição mais abrangente, sem esses limites, o Snowflake oferece uma função de tabela do Information Schema, COPY_HISTORY, que retorna o histórico de carregamento de um canal ou tabela.
Método: GET
GET URL:
https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<startTime>&endTimeExclusive=<endTime>&requestId=<requestId>
Parâmetros de URL:
account(obrigatório): seu identificador de conta Snowflake exclusivo.pipeName(obrigatório): o nome totalmente qualificado e sensível a maiúsculas e minúsculas do Snowpipe. Exemplo:myDatabase.mySchema.myPipe.startTimeInclusive(obrigatório): o início do intervalo de tempo para recuperar os dados do histórico de carga, especificado como um carimbo de data/hora no formato ISO-8601 (por exemplo, 2023-10-26T10:00:00Z). Esse carimbo de data/hora marca o limite inferior inclusivo da consulta.endTimeExclusive(opcional): o fim do intervalo de tempo para recuperar dados do histórico de carga, especificado como um carimbo de data/hora no formato ISO-8601 (por exemplo, 2023-10-26T10:15:00Z). Esse carimbo de data/hora marca o limite superior exclusivo da consulta. Se esse parâmetro for omitido, o carimbo de data/hora do servidor atual (CURRENT_TIMESTAMP()) será usado como o fim do intervalo de tempo.requestId(opcional): uma cadeia de caracteres que você pode fornecer para rastrear essa solicitação específica no sistema da Snowflake. Recomendamos o uso de uma cadeia de caracteres aleatória, como UUID, para facilitar a depuração e o monitoramento. Acrescente isso ao URL da seguinte forma:?requestId=<your_uuid>.
Cabeçalhos de solicitação:
Accept: especifica o formato de resposta desejado. Os valores aceitos sãotext/plainouapplication/json.Authorization: seu token de autenticação Snowflake. Use o formatoBEARER <jwt_token>.
Corpo da solicitação:
Esse ponto de extremidade não aceita um corpo de solicitação para solicitações GET. Todos os parâmetros necessários são fornecidos no URL e nos cabeçalhos.
Corpo da resposta:
Códigos de resposta:
200 — Sucesso. Os resultados do escaneamento do histórico de carregamento são retornados.
400 — Falha. Solicitação inválida devido a um formato inválido ou limite excedido.
404 — Falha.
pipeNamenão reconhecido.429 — Falha. O limite de taxa de solicitação foi excedido.
500 — Falha. Ocorreu um erro interno.
Carga útil de resposta:
Uma resposta de sucesso (200) contém informações sobre arquivos que foram adicionados recentemente à tabela. Observe que esse relatório pode representar apenas uma parte de um grande arquivo.
Por exemplo:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "startTimeInclusive": "2017-08-25T18:42:31.081Z", "endTimeExclusive":"2017-08-25T22:43:45.552Z", "rangeStartTime":"2017-08-25T22:43:45.383Z", "rangeEndTime":"2017-08-25T22:43:45.383Z", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mystage/", "fileSize": 57, "timeReceived": "2017-08-25T22:43:45.383Z", "lastInsertTime": "2017-08-25T22:43:45.383Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }Campos de resposta:
Campo
Tipo
Descrição
pipeCadeia de caracteres
Nome totalmente qualificado do canal.
completeResultBooliano
falsese o relatório estiver incompleto (ou seja, o número de entradas no intervalo especificado exceder o limite de 10.000 entradas). Sefalse, o usuário pode especifique o valor atualrangeEndTimecomo o valorstartTimeInclusivepara a próxima solicitação para prosseguir para o próximo conjunto de entradas.
startTimeInclusiveCadeia de caracteres
Carimbo de data/hora inicial (no formato ISO 8601) fornecido na solicitação.
endTimeExclusiveCadeia de caracteres
Carimbo de data/hora final (no formato ISO 8601) fornecido na solicitação.
rangeStartTimeCadeia de caracteres
Carimbo de data/hora (no formato ISO 8601) da entrada mais antiga dos arquivos incluídos na resposta.
rangeEndTimeCadeia de caracteres
Carimbo de data/hora (no formato ISO 8601) da entrada mais recente dos arquivos incluídos na resposta.
filesMatriz
Uma matriz de objetos JSON, um objeto para cada arquivo que faz parte da resposta do histórico. Dentro da matriz, os campos de resposta são os mesmos que os retornados na resposta
insertReport.