Option 1: Laden von Daten mit der Snowpipe REST-API

Unter diesem Thema wird beschrieben, wie Sie die öffentlichen REST-Endpunkte aufrufen, um Daten zu laden und Berichte über den Ladeverlauf abzurufen. Die Anweisungen gehen davon aus, dass Sie die Setup-Anweisungen unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API abgeschlossen haben.

Unter diesem Thema:

Laden von Daten

Das Laden erfolgt in zwei Schritten:

Schritt 1:

Stellen Sie die Datendateien im Stagingbereich bereit:

  • Interner Stagingbereich: Verwenden Sie den Befehl PUT, um Ihre Dateien im Stagingbereich bereitzustellen.

  • Externer Stagingbereich: Verwenden Sie die Clienttools des Cloudanbieters, um Ihre Dateien in den Stagingbereich (Amazon S3, Google Cloud Storage oder Microsoft Azure) zu kopieren.

Schritt 2:

Senden Sie eine Anfrage an den REST-Endpunkt insertFile, um die Staging-Datendateien zu laden.

Unter diesem Thema finden Sie Beispiele für Java- und Python-Programme, die veranschaulichen, wie ein REST-Endpunkt übermittelt wird.

Beispielprogramm für das Java-SDK

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.bouncycastle.pkcs.PKCSException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivateKey;
import java.security.Security;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  // Path to the private key file that you generated earlier.
  private static final String PRIVATE_KEY_FILE = "/<path>/rsa_key.p8";

  public static class PrivateKeyReader
  {
    // If you generated an encrypted private key, implement this method to return
    // the passphrase for decrypting your private key.
    private static String getPrivateKeyPassphrase() {
      return "<private_key_passphrase>";
    }

    public static PrivateKey get(String filename)
            throws Exception
    {
      PrivateKeyInfo privateKeyInfo = null;
      Security.addProvider(new BouncyCastleProvider());
      // Read an object from the private key file.
      PEMParser pemParser = new PEMParser(new FileReader(Paths.get(filename).toFile()));
      Object pemObject = pemParser.readObject();
      if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
        // Handle the case where the private key is encrypted.
        PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemObject;
        String passphrase = getPrivateKeyPassphrase();
        InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
        privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
      } else if (pemObject instanceof PrivateKeyInfo) {
        // Handle the case where the private key is unencrypted.
        privateKeyInfo = (PrivateKeyInfo) pemObject;
      }
      pemParser.close();
      JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
      return converter.getPrivateKey(privateKeyInfo);
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String host = "<account_identifier>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(host.split("\.")[0], user, pipe, privateKey, "https", host, 443);
      List<StagedFileWrapper> files = new ArrayList<>();
      // Add the paths and sizes the files that you want to load.
      // Use paths that are relative to the stage where the files are located
      // (the stage that is specified in the pipe definition)..
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      ...
      manager.ingestFiles(files, null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}
Copy

In diesem Beispiel werden die Bouncy Castle Crypto-APIs verwendet. Um dieses Beispiel kompilieren und ausführen zu können, müssen Sie Ihrem Klassenpfad die folgenden JAR-Dateien hinzufügen:

  • die Anbieter-JAR-Datei (bcprov-jdkversions.jar)

  • die PKIX/CMS/EAC/PKCS/OCSP/TSP/OPENSSL-JAR-Datei (bcpkix-jdkversions.jar)

wobei versions die Versionen der JDK angibt, die von der JAR-Datei unterstützt werden.

Bevor Sie den Beispielcode kompilieren, ersetzen Sie die folgenden Platzhalterwerte:

PRIVATE_KEY_FILE = "/<Pfad>/rsa_key.p8"

Geben Sie den lokalen Pfad zu der privaten Schlüsseldatei an, die Sie in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.

return "<Passphrase_des_privaten_Schlüssels >" in getPrivateKeyPassphrase()

Wenn Sie einen verschlüsselten Schlüssel generiert haben, implementieren Sie die getPrivateKeyPassphrase()-Methode, um die Passphrase zum Entschlüsseln dieses Schlüssels zurückzugeben.

host = "<Kontobezeichner>.snowflakecomputing.com"

Geben Sie Ihre Hostinformationen in Form einer URL an.

Das bevorzugte Format für den Kontobezeichner ist wie folgt:

organization_name-account_name

Namen Ihrer Snowflake-Organisation und Ihres Snowflake-Kontos. Weitere Details dazu finden Sie unter Format 1 (bevorzugt): Kontoname in Ihrer Organisation.

Geben Sie alternativ Ihren Konto-Locator an, ggf. zusammen mit der Region und der Cloudplattform, auf der das Konto gehostet wird. Weitere Details dazu finden Sie unter Format 2 (älter): Konto-Locator in einer Region.

user = "<Benutzeranmeldename>"

Geben Sie Ihren Snowflake-Anmeldenamen an.

pipe = "<Datenbankname>.<Schemaname>.<Pipename>"

Geben Sie den vollqualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll.

files.add("<Pfad>/<Dateiname>", <Dateigröße_in_Byte>)

Geben Sie in der Liste der Dateiobjekte den Pfad zu den zu ladenden Dateien an:

Geben Sie optional die Größe jeder Datei in Byte an, um Verzögerungen bei der Berechnung der zum Laden der Daten erforderlichen Operationen durch Snowpipe zu vermeiden.

Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung .csv.gz haben.

Beispielprogramm für das Python-SDK

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
  return '<private_key_passphrase>'

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  get_private_key_passphrase().encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

ingest_manager = SimpleIngestManager(account='<account_identifier>',
                                     host='<account_identifier>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = [
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  ...
  ]

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)
Copy

Bevor Sie den Beispielcode ausführen, ersetzen Sie die folgenden Platzhalterwerte:

<Pfad_zu_privatem_Schlüssel>

Geben Sie den lokalen Pfad zu der privaten Schlüsseldatei an, die Sie in Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselrotation (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.

return "<Passphrase_des_privaten_Schlüssels >" in get_private_key_passphrase()

Wenn Sie einen verschlüsselten Schlüssel generiert haben, implementieren Sie die Funktion get_private_key_passphrase(), um die Passphrase zum Entschlüsseln dieses Schlüssels zurückzugeben.

account='<Kontobezeichner>'

Geben Sie den eindeutigen Bezeichner für Ihr Konto an (wird von Snowflake bereitgestellt). Siehe auch die Beschreibung von host.

host='<Kontobezeichner>.snowflakecomputing.com'

Geben Sie den eindeutigen Hostnamen für Ihr Snowflake-Konto an.

Das bevorzugte Format für den Kontobezeichner ist wie folgt:

organization_name-account_name

Namen Ihrer Snowflake-Organisation und Ihres Snowflake-Kontos. Weitere Details dazu finden Sie unter Format 1 (bevorzugt): Kontoname in Ihrer Organisation.

Geben Sie alternativ Ihren Konto-Locator an, ggf. zusammen mit der Region und der Cloudplattform, auf der das Konto gehostet wird. Weitere Details dazu finden Sie unter Format 2 (älter): Konto-Locator in einer Region.

user='<user_login_name>'

Geben Sie Ihren Snowflake-Anmeldenamen an.

pipe='<db_name>.<schema_name>.<pipe_name>'

Geben Sie den vollqualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll.

file_list=['<path>/<filename>', '<path>/<filename>'] | staged_file_list=[StagedFile('<path>/<filename>', <file_size_in_bytes>), StagedFile('<path>/<filename>', <file_size_in_bytes>)]

Geben Sie in der Liste der Dateiobjekte den Pfad zu den zu ladenden Dateien an:

Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung .csv.gz haben.

Geben Sie optional die Größe jeder Datei in Byte an, um Verzögerungen bei der Berechnung der zum Laden der Daten erforderlichen Operationen durch Snowpipe zu vermeiden.

Anzeigen des Ladeverlaufs

Snowflake bietet REST-Endpunkte und eine Tabellenfunktion Snowflake Information Schema zum Anzeigen des Ladeverlaufs:

Beachten Sie, dass die Abfrage der Information Schema-Tabellenfunktion oder der Account Usage-Ansicht im Gegensatz zum Aufruf der REST-Endpunkte ein aktives Warehouse erfordert.

Löschen von Stagingdateien

Löschen Sie die Stagingdateien, nachdem Sie die Daten erfolgreich geladen haben und die Dateien nicht mehr benötigen. Eine Anleitung dazu finden Sie unter Löschen von Stagingdateien, nachdem Snowpipe die Daten geladen hat.