Option 1 : Chargement des données à l’aide de l’API REST de Snowpipe

Ce chapitre décrit comment appeler les points de terminaison REST publics pour charger les données et récupérer les rapports d’historique de chargement. Les instructions supposent que vous avez suivi les instructions d’installation de la section Préparation du chargement des données à l’aide de l’API REST Snowpipe.

Dans ce chapitre :

Chargement des données

Le chargement s’effectue en deux étapes :

Étape 1

Préparez vos fichiers de données :

  • Zone de préparation interne : utilisez la commande PUT pour préparer vos fichiers.

  • Zone de préparation externe : utilisez les outils du client fournis par le fournisseur de Cloud pour copier vos fichiers vers l’emplacement de la zone de préparation (Amazon S3, Google Cloud Storage ou Microsoft Azure).

Étape 2

Soumettez une requête au point de terminaison insertFiles REST pour charger les fichiers de données mis en zone de préparation.

Pour plus de praticité, des exemples de programmes Java et Python illustrant comment soumettre un point de terminaison REST sont fournis dans ce chapitre.

Exemple de programme pour le SDK Java

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.bouncycastle.pkcs.PKCSException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivateKey;
import java.security.Security;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  // Path to the private key file that you generated earlier.
  private static final String PRIVATE_KEY_FILE = "/<path>/rsa_key.p8";

  public static class PrivateKeyReader
  {
    // If you generated an encrypted private key, implement this method to return
    // the passphrase for decrypting your private key.
    private static String getPrivateKeyPassphrase() {
      return "<private_key_passphrase>";
    }

    public static PrivateKey get(String filename)
            throws Exception
    {
      PrivateKeyInfo privateKeyInfo = null;
      Security.addProvider(new BouncyCastleProvider());
      // Read an object from the private key file.
      PEMParser pemParser = new PEMParser(new FileReader(Paths.get(filename).toFile()));
      Object pemObject = pemParser.readObject();
      if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
        // Handle the case where the private key is encrypted.
        PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemObject;
        String passphrase = getPrivateKeyPassphrase();
        InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
        privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
      } else if (pemObject instanceof PrivateKeyInfo) {
        // Handle the case where the private key is unencrypted.
        privateKeyInfo = (PrivateKeyInfo) pemObject;
      }
      pemParser.close();
      JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
      return converter.getPrivateKey(privateKeyInfo);
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String host = "<account_identifier>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(host.split("\.")[0], user, pipe, privateKey, "https", host, 443);
      List<StagedFileWrapper> files = new ArrayList<>();
      // Add the paths and sizes the files that you want to load.
      // Use paths that are relative to the stage where the files are located
      // (the stage that is specified in the pipe definition)..
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      ...
      manager.ingestFiles(files, null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}
Copy

Cet exemple utilise les Crypto APIs de Bouncy Castle. Afin de compiler et d’exécuter cet exemple, vous devez inclure les fichiers JAR suivants dans votre classpath :

  • le fichier JAR du fournisseur (bcprov-jdkversions.jar)

  • le fichier PKIX / CMS / EAC / PKCS / OCSP / TSP / OPENSSL JAR (bcpkix-jdkversions.jar)

versions indique les versions du JDK que le fichier JAR prend en charge.

Avant de compiler l’exemple de code, remplacez les caractères de remplacement suivants :

PRIVATE_KEY_FILE = "/<chemin>/rsa_key.p8"

Spécifiez le chemin local au fichier de clé privée que vous avez créé dans Utilisation de l’authentification par paires de clés et rotation des clés (dans Préparation du chargement des données à l’aide de l’API REST Snowpipe).

return "<phrasesecrète_clé_privée>" dans getPrivateKeyPassphrase()

Si vous avez généré une clé chiffrée, implémentez la méthode getPrivateKeyPassphrase() pour renvoyer la phrase secrète afin de déchiffrer cette clé.

host = "<identificateur_de_compte>.snowflakecomputing.com"

Spécifiez vos informations d’hôte sous la forme d’une URL.

Le format préféré de l’identificateur du compte est le suivant :

organization_name-account_name

Noms de votre organisation et de votre compte Snowflake. Pour plus de détails, voir Format 1 (recommandé) : nom du compte dans votre organisation..

Vous pouvez également indiquer votre localisateur de compte, ainsi que la région et la plate-forme Cloud où le compte est hébergé, si nécessaire. Pour plus de détails, voir Format 2 (existant) : localisateur de compte dans une région.

user = "<nom_connexion_utilisateur>"

Précisez votre nom d’utilisateur Snowflake.

pipe = "<nom_bd>.<nom_schéma>.<nom_canal>"

Spécifiez le nom complet du canal à utiliser pour charger les données.

files.add("<path>/<filename>", <file_size_in_bytes>)

Indiquez le chemin de vos fichiers à charger dans la liste des objets de fichiers.

Spécifiez éventuellement la taille de chaque fichier, en octets, afin d’éviter tout retard lorsque Snowpipe calcule les opérations nécessaires au chargement des données.

Le chemin que vous spécifiez doit être relatif à la zone de préparation dans laquelle les fichiers sont situés. Incluez le nom complet de chaque fichier, y compris son extension. Par exemple, un fichier CSV compressé en gzip peut porter l’extension suivante .csv.gz.

Exemple de programme pour le SDK Python

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
  return '<private_key_passphrase>'

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  get_private_key_passphrase().encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

ingest_manager = SimpleIngestManager(account='<account_identifier>',
                                     host='<account_identifier>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = [
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  ...
  ]

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)
Copy

Avant d’exécuter l’exemple de code, remplacez les caractères de remplacement suivants :

<chemin_clé_privée>

Spécifiez le chemin local au fichier de clé privée que vous avez créé dans Utilisation de l’authentification par paires de clés et rotation des clés (dans Préparation du chargement des données à l’aide de l’API REST Snowpipe).

return "<phrasesecrète_clé_privée>" dans get_private_key_passphrase()

Si vous avez généré une clé chiffrée, implémentez la fonction get_private_key_passphrase() pour renvoyer la phrase secrète afin de déchiffrer cette clé.

account='<identificateur_de_compte>'

Indiquez l’identificateur unique de votre compte (fourni par Snowflake). Voir la description de host.

host='<identificateur_de_compte>.snowflakecomputing.com'

Indiquez le nom d’hôte unique de votre compte Snowflake.

Le format préféré de l’identificateur du compte est le suivant :

organization_name-account_name

Noms de votre organisation et de votre compte Snowflake. Pour plus de détails, voir Format 1 (recommandé) : nom du compte dans votre organisation..

Vous pouvez également indiquer votre localisateur de compte, ainsi que la région et la plate-forme Cloud où le compte est hébergé, si nécessaire. Pour plus de détails, voir Format 2 (existant) : localisateur de compte dans une région.

user='<user_login_name>'

Précisez votre nom d’utilisateur Snowflake.

pipe='<db_name>.<schema_name>.<pipe_name>'

Spécifiez le nom complet du canal à utiliser pour charger les données.

file_list=['<path>/<filename>', '<path>/<filename>'] | staged_file_list=[StagedFile('<path>/<filename>', <file_size_in_bytes>), StagedFile('<path>/<filename>', <file_size_in_bytes>)]

Indiquez le chemin de vos fichiers à charger dans la liste des objets de fichiers.

Le chemin que vous spécifiez doit être relatif à la zone de préparation dans laquelle les fichiers sont situés. Incluez le nom complet de chaque fichier, y compris son extension. Par exemple, un fichier CSV compressé en gzip peut porter l’extension suivante .csv.gz.

Spécifiez éventuellement la taille de chaque fichier, en octets, afin d’éviter tout retard lorsque Snowpipe calcule les opérations nécessaires au chargement des données.

Affichage de l’historique de chargement

Snowflake fournit des points de terminaison REST et une fonction de table Schéma d’information de Snowflake pour visualiser votre historique de chargement :

Notez que l’interrogation de la fonction de table Information Schema ou de la vue Account Usage, contrairement à l’appel des points de terminaison REST, nécessite un entrepôt en cours d’exécution.

Suppression des fichiers en zone de préparation

Supprimez les fichiers en zone de préparation après avoir chargé avec succès les données et lorsque vous n’avez plus besoin des fichiers. Pour obtenir des instructions, voir Suppression des fichiers en zone de préparation après le chargement des données par Snowpipe.