API REST Snowpipe

Vous interagissez avec un canal en passant des appels vers des points de terminaison REST. Ce chapitre décrit l’API REST Snowpipe pour définir la liste des fichiers à intégrer et récupérer les rapports de l’historique de chargement.

Snowflake fournit également des APIs Java et Python qui simplifient le travail avec l’API REST Snowpipe.

Dans ce chapitre :

Intégration d’un fichier de données

L’API de Snowpipe fournit un point de terminaison REST pour définir la liste des fichiers à intégrer.

Point de terminaison : insertFiles

Informe Snowflake sur les fichiers à intégrer dans une table. Une réponse réussie de ce point de terminaison signifie que Snowflake a enregistré la liste des fichiers à ajouter à la table. Cela ne signifie pas nécessairement que les fichiers ont été intégrés. Pour plus de détails, voir les codes de réponse ci-dessous.

Dans la plupart des cas, Snowflake insère de nouvelles données dans la table cible en quelques minutes.

Méthode : POST

POST URL:

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

URL Paramètres :

  • account (Obligatoire) : identificateur de votre compte Snowflake.

  • pipeName (Obligatoire) : nom du canal entièrement qualifié, sensible à la casse. Par exemple, myDatabase.mySchema.myPipe.

  • requestId (facultatif) : chaîne utilisée pour suivre les requêtes dans le système. Nous vous recommandons de fournir une chaîne aléatoire à chaque requête, par exemple UUID. Elle doit être ajoutée à l’adresse URL comme suit : ?requestId=<your_uuid>.

En-têtes de requête

  • Content-Type: :

    • text/plain : pour une liste en texte brut de chemins et de noms de fichiers, un par ligne. Le paramètre de taille n’est pas autorisé dans ce format.

    • application/json : pour un objet JSON contenant une liste de fichiers avec des informations facultatives sur leur taille.

  • Authorization : BEARER <jwt_token>

Corps de la requête (pour application/json Content-Type)

Le corps de la requête doit être un objet JSON avec une seule clé nommée « files ». La valeur associée à cette clé est un tableau d’objets JSON, où chaque objet représente un fichier à ingérer.

{
  "files":[
    {
      "path":"filePath/file1.csv",
      "size":100
    },
    {
      "path":"filePath/file2.csv",
      "size":100
    }
   ]
}
Copy

Chaque élément du tableau « files » est un objet JSON doté des attributs suivants :

  • path (obligatoire) : le chemin et le nom du fichier en zone de préparation. Si vous suivez nos bonnes pratiques recommandées en partitionnant vos données de zone de préparation en utilisant des chemins logiques et granulaires, les valeurs des chemins dans la charge utile incluent les chemins complets vers les fichiers en zone de préparation.

  • size (facultatif, mais recommandé pour de meilleures performances) : la taille du fichier en octets.

Corps de la requête (pour le type de contenu text/plain)

Le corps de la requête doit être une liste en texte brut de chemins et de noms de fichiers, avec une entrée par ligne.

filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt
Copy

Note

Le message peut contenir au maximum 5 000 fichiers. Chaque chemin de fichier donné doit être <= 1 024 octets de long lorsqu’il est sérialisé en tant qu’UTF-8.

Corps de réponse

Codes de réponse :

  • 200 — Succès. Fichiers ajoutés à la file d’attente des fichiers en vue d’une intégration.

  • 400 — Échec. Requête non valide en raison d’un format non valide, ou limite dépassée.

  • 404 — Échec. pipeName non reconnu.

    Ce code d’erreur peut également être renvoyé si le rôle utilisé lors de l’appel du point de terminaison ne dispose pas de privilèges suffisants. Pour plus d’informations, voir Octroi de privilèges d’accès.

  • 429 — Échec. La limite du taux de demande est dépassée.

  • 500 — Échec. Une erreur interne s’est produite.

Charge utile de réponse :

Avec une requête API réussie (c’est-à-dire code 200), la charge utile de la réponse contient les éléments requestId et status au format JSON. Si une erreur se produit, la charge utile de la réponse peut contenir des détails sur l’erreur.

{
  "requestId": "your_request_uuid",
  "status": "success"
}
Copy

Si l’instruction COPY INTO <table> dans la définition de canal inclut l’option de copie PATTERN, l’attribut unmatchedPatternFiles répertorie tous les fichiers soumis dans l’en-tête qui ne correspondaient pas à l’expression régulière et qui ont donc été ignorés.

{
  "requestId": "your_request_uuid",
  "status": "success",
  "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"]
}
Copy

Rapports d’historique de chargement

L’API Snowpipe fournit des points de terminaison REST pour récupérer les rapports de chargement.

Point de terminaison : insertReport

Récupère un rapport de fichiers soumis via insertFiles dont le contenu a été récemment intégré à une table. Notez que pour les fichiers volumineux, cela peut ne faire que partie du fichier.

Notez les limitations suivantes pour ce point de terminaison :

  • Les 10 000 événements les plus récents sont conservés.

  • Les événements sont conservés pendant un maximum de 10 minutes.

Un événement se produit lorsque les données d’un fichier soumis via insertFiles ont été validées dans la table et sont disponibles pour les requêtes. Le point de terminaison insertReport peut être considéré comme la fin de la commande UNIX. En appelant cette commande à plusieurs reprises, il est possible de voir l’historique complet des événements d’un canal au fil du temps. Notez que la commande doit être appelée assez souvent pour ne pas rater d’événements. La fréquence dépend de celle à laquelle les fichiers de taux sont envoyés à insertFiles.

Méthode : GET

GET URL:

https://<account_identifier>.snowflakecomputing.com/v1/data/pipes/<pipeName>/insertReport?requestId=<requestId>&beginMark=<beginMark>

URL Paramètres :

  • account_identifier (obligatoire) : l’identificateur unique de votre compte Snowflake. Le format préféré est organization_name-account_name. Pour les formats alternatifs (emplacement des comptes avec région et plateforme Cloud), voir Format 1 (recommandé) : nom du compte dans votre organisation.

  • pipeName (obligatoire) : le nom entièrement qualifié, sensible à la casse, du Snowpipe. Par exemple, myDatabase.mySchema.myPipe.

  • requestId (facultatif) : une chaîne que vous pouvez fournir pour suivre cette requête spécifique dans le système de Snowflake. L’utilisation d’une chaîne aléatoire telle que UUID est fortement recommandée pour faciliter le débogage et la surveillance. Ajoutez ceci à URL comme suit : ?requestId=<your_uuid>.

  • beginMark (facultatif) : valeur du marqueur renvoyée dans le champ nextBeginMark d’une réponse précédente à insertReport. L’inclusion de ce marqueur permet d’optimiser les appels ultérieurs en réduisant potentiellement le nombre d’événements renvoyés en double. Note : bien que le site beginMark ait pour but d’éviter les doublons, il se peut que des événements se répètent occasionnellement. Si beginMark n’est pas spécifié, le rapport affichera l’historique d’ingestion des 10 dernières minutes. Ajoutez ceci à URL comme suit : ?beginMark=<previous_nextBeginMark>.

En-têtes de requête :

  • Accepter : spécifie le format de réponse souhaité. Les valeurs acceptées sont text/plain ou application/json.

  • Autorisation : votre jeton d’authentification Snowflake. Utilisez le format BEARER <jwt_token>.

Corps de la requête :

Ce point de terminaison n’accepte pas de corps de requête pour les requêtes GET. Les paramètres nécessaires sont fournis dans les URL et les en-têtes.

Corps de réponse :

Codes de réponse :

  • 200 — Succès. Rapport retourné.

  • 400 — Échec. Requête non valide en raison d’un format non valide, ou limite dépassée.

  • 404 — Échec. pipeName non reconnu.

    Ce code d’erreur peut également être renvoyé si le rôle utilisé lors de l’appel du point de terminaison ne dispose pas de privilèges suffisants. Pour plus d’informations, voir Octroi de privilèges d’accès.

  • 429 — Échec. La limite du taux de demande est dépassée.

  • 500 — Échec. Une erreur interne s’est produite.

Charge utile de réponse :

Une réponse de succès (200) contient des informations sur les fichiers qui ont récemment été ajoutés à la table. Notez que ce rapport peut ne représenter qu’une partie d’un fichier volumineux.

Par exemple :

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "nextBeginMark": "1_39",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mybucket/",
      "fileSize": 57,
      "timeReceived": "2017-06-21T04:47:41.453Z",
      "lastInsertTime": "2017-06-21T04:48:28.575Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

Champs de réponse :

Champ

Type

Description

pipe

Chaîne

Nom complet du canal.

completeResult

Booléen

false si un événement a été omis entre le beginMark fourni et le premier événement de l’historique de ce rapport. Sinon, true.

nextBeginMark

Chaîne

beginMark à utiliser lors de la prochaine demande pour éviter de voir des enregistrements en double. Notez que cette valeur est un indice. Des doublons peuvent encore se produire à l’occasion.

files

Tableau

Un tableau d’objets JSON, un objet pour chaque fichier faisant partie de la réponse historique.

path

Chaîne

Chemin du fichier par rapport à l’emplacement de la zone de préparation.

stageLocation

Chaîne

Soit l’ID de zone de préparation (zone de préparation interne) soit le compartiment S3 (zone de préparation externe) défini dans le canal.

fileSize

Long

Taille du fichier, en octets.

timeReceived

Chaîne

Heure à laquelle ce fichier a été reçu pour traitement. Le format est ISO-8601 dans le fuseau horaire UTC.

lastInsertTime

Chaîne

Heure à laquelle les données de ce fichier ont été insérées pour la dernière fois dans la table. Le format est ISO-8601 dans le fuseau horaire UTC.

rowsInserted

Long

Nombre de lignes insérées dans le tableau cible à partir du fichier.

rowsParsed

Long

Nombre de lignes analysées à partir du fichier. Les lignes comportant des erreurs peuvent être ignorées.

errorsSeen

Entier

Nombre d’erreurs vues dans le fichier

errorLimit

Entier

Nombre d’erreurs autorisées dans le fichier avant qu’un échec soit considéré (selon l’option de copie ON_ERROR).

firstError [1]

Chaîne

Message d’erreur pour la première erreur rencontrée dans ce fichier.

firstErrorLineNum [1]

Long

Numéro de ligne de la première erreur.

firstErrorCharacterPos [1]

Long

Position du caractère de la première erreur.

firstErrorColumnName [1]

Chaîne

Nom de la colonne où la première erreur s’est produite.

systemError [1]

Chaîne

Erreur générale décrivant pourquoi le fichier n’a pas été traité.

complete

Booléen

Indique si le fichier a été entièrement traité avec succès.

status

Chaîne

Statut de chargement du fichier :

  • LOAD_IN_PROGRESS : une partie du fichier a été chargée dans la table, mais le processus de chargement n’est pas encore terminé.

  • LOADED : le fichier entier a été chargé dans la table.

  • LOAD_FAILED : le chargement du fichier a échoué.

  • PARTIALLY_LOADED : certaines lignes de ce fichier ont été chargées avec succès, mais d’autres n’ont pas été chargées à cause d’erreurs. Le traitement de ce fichier est terminé.

[1] Les valeurs ne sont fournies pour ces champs que lorsque les fichiers contiennent des erreurs.

Point de terminaison : loadHistoryScan

Récupère un rapport sur les fichiers intégrés dont le contenu a été ajouté à la table. Notez que pour les fichiers volumineux, cela peut ne faire que partie du fichier. Ce point de terminaison diffère de insertReport en ce qu’il affiche l’historique entre deux points dans le temps. Il existe un maximum de 10 000 éléments retournés, mais plusieurs appels peuvent être émis pour couvrir la période désirée.

Important

Notez que ce point de terminaison présente des limites de débit afin d’éviter tout appel excessif. Pour éviter de dépasser la limite de débit (code d’erreur 429), nous vous recommandons de vous appuyer davantage sur insertReport plutôt que sur loadHistoryScan. Lorsque vous appelez loadHistoryScan, spécifiez la plage de temps la plus serrée comprenant un ensemble de charges de données. Par exemple, lire les 10 dernières minutes de l’historique toutes les 8 minutes fonctionnerait bien. Essayer de lire les dernières 24 heures de l’historique chaque minute entraînera 429 erreurs indiquant qu’une limite de débit a été atteinte. Les limites de débit sont conçues pour permettre à chaque enregistrement historique d’être lu plusieurs fois.

Pour une vue complète, sans ces limites, Snowflake fournit une fonction de table Information Schema, COPY_HISTORY, qui retourne l’historique de chargement d’un canal ou d’une table.

Méthode : GET

GET URL:

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<heureDébut>&endTimeExclusive=<heureFin>&requestId=<idRequête>

URL Paramètres :

  • account (obligatoire) : l’identificateur unique de votre compte Snowflake.

  • pipeName (obligatoire) : le nom entièrement qualifié, sensible à la casse, du Snowpipe. Exemple : myDatabase.mySchema.myPipe.

  • startTimeInclusive (obligatoire) : le début de l’intervalle de temps pour récupérer les données de l’historique de chargement, spécifié sous forme d’horodatage au format ISO-8601 (par exemple, 2023-10-26T10:00:00Z). Cet horodatage marque la borne inférieure inclusive de la requête.

  • endTimeExclusive (facultatif) : la fin de la plage de temps pour récupérer les données de l’historique de chargement, spécifiée sous forme d’horodatage au format ISO-8601 (par exemple, 2023-10-26T10:15:00Z). Cet horodatage marque la limite supérieure exclusive de la requête. Si ce paramètre est omis, l’horodatage actuel du serveur (CURRENT_TIMESTAMP()) sera utilisé comme fin de l’intervalle de temps.

  • requestId (facultatif) : une chaîne que vous pouvez fournir pour suivre cette requête spécifique dans le système de Snowflake. Nous vous recommandons d’utiliser une chaîne aléatoire comme UUID pour faciliter le débogage et la surveillance. Ajoutez ceci à URL comme suit : ?requestId=<your_uuid>.

En-têtes de requête :

  • Accept : spécifie le format de réponse souhaité. Les valeurs acceptées sont text/plain ou application/json.

  • Authorization : votre jeton d’authentification Snowflake. Utilisez le format BEARER <jwt_token>.

Corps de la requête :

Ce point de terminaison n’accepte pas de corps de requête pour les requêtes GET. Tous les paramètres nécessaires sont fournis dans les URL et les en-têtes.

Corps de réponse :

Codes de réponse :

  • 200 — Succès. Les résultats d’analyse de l’historique de chargement sont retournés.

  • 400 — Échec. Requête non valide en raison d’un format non valide, ou limite dépassée.

  • 404 — Échec. pipeName non reconnu.

  • 429 — Échec. La limite du taux de demande est dépassée.

  • 500 — Échec. Une erreur interne s’est produite.

Charge utile de réponse :

Une réponse de succès (200) contient des informations sur les fichiers qui ont récemment été ajoutés à la table. Notez que ce rapport peut ne représenter qu’une partie d’un fichier volumineux.

Par exemple :

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "startTimeInclusive": "2017-08-25T18:42:31.081Z",
  "endTimeExclusive":"2017-08-25T22:43:45.552Z",
  "rangeStartTime":"2017-08-25T22:43:45.383Z",
  "rangeEndTime":"2017-08-25T22:43:45.383Z",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mystage/",
      "fileSize": 57,
      "timeReceived": "2017-08-25T22:43:45.383Z",
      "lastInsertTime": "2017-08-25T22:43:45.383Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

Champs de réponse :

Champ

Type

Description

pipe

Chaîne

Nom complet du canal.

completeResult

Booléen

false si le rapport est incomplet (c.-à-d. si le nombre d’entrées dans l’intervalle de temps spécifié dépasse la limite de 10 000 entrées). Si false, l’utilisateur peut spécifier la valeur rangeEndTime courante comme valeur startTimeInclusive pour la prochaine requête pour passer à l’ensemble suivant d’entrées.

startTimeInclusive

Chaîne

Horodatage de démarrage (au format ISO-8601) fourni dans la demande.

endTimeExclusive

Chaîne

Horodatage de fin (au format ISO-8601) fourni dans la demande.

rangeStartTime

Chaîne

Horodatage (au format ISO-8601) de l’entrée la plus ancienne des fichiers inclus dans la réponse.

rangeEndTime

Chaîne

Horodatage (au format ISO-8601) de l’entrée la plus récente des fichiers inclus dans la réponse.

files

Tableau

Un tableau d’objets JSON, un objet pour chaque fichier faisant partie de la réponse historique. Dans le tableau, les champs de réponse sont les mêmes que ceux retournés dans la réponse insertReport.