Option 1 : Chargement des données à l’aide de l’API REST de Snowpipe

Ce chapitre décrit comment appeler les points de terminaison REST publics pour charger les données et récupérer les rapports d’historique de chargement. Les instructions supposent que vous avez suivi les instructions d’installation de la section Préparation du chargement des données à l’aide de l’API REST Snowpipe.

Dans ce chapitre :

Chargement des données

Le chargement s’effectue en deux étapes :

Étape 1

Préparez vos fichiers de données :

  • Zone de préparation interne : utilisez la commande PUT pour préparer vos fichiers.

  • Zone de préparation externe : utilisez les outils du client fournis par le fournisseur de Cloud pour copier vos fichiers vers l’emplacement de la zone de préparation (Amazon S3, Google Cloud Storage ou Microsoft Azure).

Étape 2

Soumettez une requête au point de terminaison insertFiles REST pour charger les fichiers de données mis en zone de préparation.

Pour plus de praticité, des exemples de programmes Java et Python illustrant comment soumettre un point de terminaison REST sont fournis dans ce chapitre.

Exemple de programme pour le SDK Java

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.EncryptedPrivateKeyInfo;
import javax.crypto.SecretKeyFactory;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataInputStream;
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.time.Instant;
import java.util.Base64;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  private static final String PUBLIC_KEY_FILE = "/<path>/public_key.der";
  private static final String PRIVATE_KEY_FILE = "/<path>/private_key.der";

  public static class PrivateKeyReader
  {
    public static PrivateKey get(String filename)
            throws Exception
    {
      File f = new File(filename);
      FileInputStream fis = new FileInputStream(f);
      DataInputStream dis = new DataInputStream(fis);
      byte[] keyBytes = new byte[(int) f.length()];
      dis.readFully(keyBytes);
      dis.close();

      String encrypted = new String(keyBytes);
      String passphrase = System.getenv("PRIVATE_KEY_PASSPHRASE");
      encrypted = encrypted.replace("-----BEGIN ENCRYPTED PRIVATE KEY-----", "");
      encrypted = encrypted.replace("-----END ENCRYPTED PRIVATE KEY-----", "");
      EncryptedPrivateKeyInfo pkInfo = new EncryptedPrivateKeyInfo(Base64.getMimeDecoder().decode(encrypted));
      PBEKeySpec keySpec = new PBEKeySpec(passphrase.toCharArray());
      SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());
      PKCS8EncodedKeySpec encodedKeySpec = pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));
      KeyFactory keyFactory = KeyFactory.getInstance("RSA");
      PrivateKey encryptedPrivateKey = keyFactory.generatePrivate(encodedKeySpec);
      return encryptedPrivateKey;
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String account = "<account_name>";
    final String host = "<account_name>.<region_id>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(account, user, pipe, privateKey, "https", host, 443);
      Set<String> files = new TreeSet<>();
      files.add("<path>/<filename>");
      files.add("<path>/<filemame>");
      manager.ingestFiles(manager.wrapFilepaths(files), null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}

Spécifiez la phrase secrète pour déchiffrer le fichier de clé privée à l’aide de la variable d’environnement PRIVATE_KEY_PASSPHRASE :

  • Linux ou macOS :

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows :

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

De plus, avant d’exécuter le programme d’exemple Java, vous devez le modifier comme suit :

  • Mettez à jour les paramètres de sécurité :

    PUBLIC_KEY_FILE = « /<path>/public_key.der »` , . PRIVATE_KEY_FILE = "/<chemin>/public_key.der

    Spécifiez les chemins locaux des fichiers de clés publiques et privées que vous avez créés dans Utilisation de l’authentification par paire de clés (dans Préparation du chargement des données à l’aide de l’API REST Snowpipe).

  • Mettez à jour les paramètres de session :

    account = "<nom_compte>"

    Précisez le nom de votre compte (fourni par Snowflake).

    host = "<nom_compte>.<id_région>.snowflakecomputing.com"

    Spécifiez vos informations d’hôte sous la forme d’une URL. Notez que le format de l’URL est différent selon la région de l’emplacement de votre compte :

    US Ouest

    <nom_compte>.snowflakecomputing.com

    Autres régions

    <nom_compte>.<id_région>.snowflakecomputing.com

    <nom_compte> est le nom de votre compte (fourni par Snowflake) et <id_région> est :

    Région

    ID de région

    Remarques

    Amazon Web Services (AWS)

    US Ouest (Oregon)

    us-west-2

    Requis uniquement lors de la configuration de AWS PrivateLink pour les comptes situés dans la région US Ouest.

    US Est (Ohio)

    us-east-2.aws

    US Est (Virginie du Nord)

    us-east-1

    US Est (Gouvernement commercial - Virginie du Nord)

    us-east-1-gov.aws

    Disponible uniquement pour les comptes sur Business Critical (ou supérieur) ; ne se trouve pas dans AWS GovCloud (US), qui est un Cloud dédié distinct pas encore pris en charge par Snowflake.

    Canada (Centre)

    ca-central-1.aws

    EU (Irlande)

    eu-west-1

    EU (Francfort)

    eu-central-1

    Asie-Pacifique (Tokyo)

    ap-northeast-1.aws

    Asie Pacifique (Mumbai)

    ap-south-1.aws

    Asie-Pacifique (Singapour)

    ap-southeast-1

    Asie-Pacifique (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe Ouest2 (Londres)

    europe-west2.gcp

    Europe Ouest4 (Pays-Bas)

    europe-west4.gcp

    Microsoft Azure

    Ouest US 2 (Washington)

    west-us-2.azure

    Est US 2 (Virginie)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Disponible uniquement pour les comptes sur Business Critical (ou version supérieure).

    Canada Central (Toronto)

    canada-central.azure

    Europe de l’Ouest (Pays-Bas)

    west-europe.azure

    Suisse Nord (Zurich)

    switzerland-north.azure

    Asie du Sud-Est (Singapour)

    southeast-asia.azure

    Australie Est (Nouvelle-Galles du Sud)

    australia-east.azure

    user = "<nom_connexion_utilisateur>"

    Précisez votre nom d’utilisateur Snowflake.

    pipe = "<nom_bd>.<nom_schéma>.<nom_canal>"

    Spécifiez le nom complet du canal à utiliser pour charger les données.

  • Indiquez le chemin de vos fichiers à charger dans la liste des objets de fichiers :

    files.add("<chemin>/<nom_fichier>")

    Le chemin que vous spécifiez doit être relatif à la zone de préparation dans laquelle les fichiers sont situés. Incluez le nom complet de chaque fichier, y compris son extension. Par exemple, un fichier CSV compressé en gzip peut porter l’extension suivante .csv.gz.

Exemple de programme pour le SDK Python

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
file_list=['<path>/<filename>', '<path>/<filename>']
ingest_manager = SimpleIngestManager(account='<account_name>',
                                     host='<account_name>.<region_id>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = []
for file_name in file_list:
    staged_file_list.append(StagedFile(file_name, None))

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)

Spécifiez la phrase secrète pour déchiffrer le fichier de clé privée à l’aide de la variable d’environnement PRIVATE_KEY_PASSPHRASE :

  • Linux ou macOS :

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows :

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

De plus, avant d’exécuter le programme d’exemple Python, vous devez le modifier comme suit :

  • Mettez à jour le paramètre de sécurité :

    <chemin_clé_privée>

    Spécifiez le chemin d’accès local au fichier de clé privée que vous avez créé dans Utilisation de l’authentification par paire de clés (dans Préparation du chargement des données à l’aide de l’API REST Snowpipe).

  • Mettez à jour les paramètres de session :

    account='<nom_compte>'

    Précisez le nom de votre compte (fourni par Snowflake).

    host='<nom_compte>.<id_région>.snowflakecomputing.com'

    Spécifiez vos informations d’hôte sous la forme d’une URL. Le format du domaine de l’URL est différent selon la région de l’emplacement de votre compte :

    US Ouest

    <nom_compte>.snowflakecomputing.com

    Autres régions

    <nom_compte>.<id_région>.snowflakecomputing.com

    <nom_compte> est le nom de votre compte (fourni par Snowflake) et <region_id> est :

    Région

    ID de région

    Remarques

    Amazon Web Services (AWS)

    US Ouest (Oregon)

    us-west-2

    Requis uniquement lors de la configuration de AWS PrivateLink pour les comptes situés dans la région US Ouest.

    US Est (Ohio)

    us-east-2.aws

    US Est (Virginie du Nord)

    us-east-1

    US Est (Gouvernement commercial - Virginie du Nord)

    us-east-1-gov.aws

    Disponible uniquement pour les comptes sur Business Critical (ou supérieur) ; ne se trouve pas dans AWS GovCloud (US), qui est un Cloud dédié distinct pas encore pris en charge par Snowflake.

    Canada (Centre)

    ca-central-1.aws

    EU (Irlande)

    eu-west-1

    EU (Francfort)

    eu-central-1

    Asie-Pacifique (Tokyo)

    ap-northeast-1.aws

    Asie Pacifique (Mumbai)

    ap-south-1.aws

    Asie-Pacifique (Singapour)

    ap-southeast-1

    Asie-Pacifique (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe Ouest2 (Londres)

    europe-west2.gcp

    Europe Ouest4 (Pays-Bas)

    europe-west4.gcp

    Microsoft Azure

    Ouest US 2 (Washington)

    west-us-2.azure

    Est US 2 (Virginie)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Disponible uniquement pour les comptes sur Business Critical (ou version supérieure).

    Canada Central (Toronto)

    canada-central.azure

    Europe de l’Ouest (Pays-Bas)

    west-europe.azure

    Suisse Nord (Zurich)

    switzerland-north.azure

    Asie du Sud-Est (Singapour)

    southeast-asia.azure

    Australie Est (Nouvelle-Galles du Sud)

    australia-east.azure

    user='<user_login_name>'

    Précisez votre nom d’utilisateur Snowflake.

    pipe='<db_name>.<schema_name>.<pipe_name>'

    Spécifiez le nom complet du canal à utiliser pour charger les données.

  • Indiquez le chemin de vos fichiers à importer dans la liste des objets de fichiers :

    file_list=['<path>/<filename>', '<path>/<filename>']

    Le chemin que vous spécifiez doit être relatif à la zone de préparation dans laquelle les fichiers sont situés. Incluez le nom complet de chaque fichier, y compris son extension. Par exemple, un fichier CSV compressé en gzip peut porter l’extension suivante .csv.gz.

Affichage de l’historique de chargement

Snowflake fournit des points de terminaison REST et une fonction de table Schéma d’information pour visualiser votre historique de chargement :

Notez que l’interrogation de la fonction de table du schéma d’information ou de la vue Utilisation du compte, contrairement à l’appel des points de terminaison REST, nécessite un entrepôt virtuel en cours d’exécution.