Option 1: Laden von Daten mit der Snowpipe REST-API

Unter diesem Thema wird beschrieben, wie Sie die öffentlichen REST-Endpunkte aufrufen, um Daten zu laden und Berichte über den Ladeverlauf abzurufen. Die Anweisungen gehen davon aus, dass Sie die Setup-Anweisungen unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API abgeschlossen haben.

Unter diesem Thema:

Laden von Daten

Das Laden erfolgt in zwei Schritten:

Schritt 1:

Stellen Sie die Datendateien im Stagingbereich bereit:

  • Interner Stagingbereich: Verwenden Sie den Befehl PUT, um Ihre Dateien im Stagingbereich bereitzustellen.

  • Externer Stagingbereich: Verwenden Sie die Clienttools des Cloudanbieters, um Ihre Dateien in den Stagingbereich (Amazon S3, Google Cloud Storage oder Microsoft Azure) zu kopieren.

Schritt 2:

Senden Sie eine Anfrage an den REST-Endpunkt insertFile, um die Staging-Datendateien zu laden.

Unter diesem Thema finden Sie Beispiele für Java- und Python-Programme, die veranschaulichen, wie ein REST-Endpunkt übermittelt wird.

Beispielprogramm für das Java-SDK

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.EncryptedPrivateKeyInfo;
import javax.crypto.SecretKeyFactory;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataInputStream;
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.time.Instant;
import java.util.Base64;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  private static final String PUBLIC_KEY_FILE = "/<path>/public_key.der";
  private static final String PRIVATE_KEY_FILE = "/<path>/private_key.der";

  public static class PrivateKeyReader
  {
    public static PrivateKey get(String filename)
            throws Exception
    {
      File f = new File(filename);
      FileInputStream fis = new FileInputStream(f);
      DataInputStream dis = new DataInputStream(fis);
      byte[] keyBytes = new byte[(int) f.length()];
      dis.readFully(keyBytes);
      dis.close();

      String encrypted = new String(keyBytes);
      String passphrase = System.getenv("PRIVATE_KEY_PASSPHRASE");
      encrypted = encrypted.replace("-----BEGIN ENCRYPTED PRIVATE KEY-----", "");
      encrypted = encrypted.replace("-----END ENCRYPTED PRIVATE KEY-----", "");
      EncryptedPrivateKeyInfo pkInfo = new EncryptedPrivateKeyInfo(Base64.getMimeDecoder().decode(encrypted));
      PBEKeySpec keySpec = new PBEKeySpec(passphrase.toCharArray());
      SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());
      PKCS8EncodedKeySpec encodedKeySpec = pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));
      KeyFactory keyFactory = KeyFactory.getInstance("RSA");
      PrivateKey encryptedPrivateKey = keyFactory.generatePrivate(encodedKeySpec);
      return encryptedPrivateKey;
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String account = "<account_name>";
    final String host = "<account_name>.<region_id>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(account, user, pipe, privateKey, "https", host, 443);
      Set<String> files = new TreeSet<>();
      files.add("<path>/<filename>");
      files.add("<path>/<filemame>");
      manager.ingestFiles(manager.wrapFilepaths(files), null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}

Geben Sie die Passphrase für die Entschlüsselung der Datei des privaten Schlüssels mit der Umgebungsvariablen PRIVATE_KEY_PASSPHRASE an:

  • Linux oder macOS:

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows:

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

Außerdem müssen Sie das Java-Beispielprogramms vor der Ausführung wie folgt ändern:

  • Aktualisieren Sie die Sicherheitsparameter:

    PUBLIC_KEY_FILE = „/<Pfad>/public_key.der“, :newline:.` PRIVATE_KEY_FILE = "/<Pfad>/public_key.der

    Geben Sie die lokalen Pfade zu den Dateien der öffentlichen und privaten Schlüssel an, die Sie in Verwenden der Schlüsselpaar-Authentifizierung (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.

  • Aktualisieren Sie die Sitzungsparameter:

    account = "<Kontoname>"

    Geben Sie den Namen Ihres Kontos an (bereitgestellt von Snowflake).

    host = "<Kontoname>.<Regions-ID>.snowflakecomputing.com"

    Geben Sie Ihre Hostinformationen in Form einer URL an. Beachten Sie, dass das Format der URL je nach Region, in der sich Ihr Konto befindet, unterschiedlich ist:

    US West

    <Kontoname>.snowflakecomputing.com

    Andere Regionen

    <Kontoname>.<Regions-ID>.snowflakecomputing.com

    Dabei ist <Kontoname> der Name Ihres Kontos (wird von Snowflake bereitgestellt), und <Regions-ID> ist:

    Region

    Regions-ID

    Anmerkungen

    Amazon Web Services (AWS)

    US West (Oregon)

    us-west-2

    Nur erforderlich, wenn Sie AWS PrivateLink für Konten in US West konfigurieren.

    US East (Ohio)

    us-east-2.aws

    US East (N. Virginia)

    us-east-1

    US East (Commercial Gov - N. Virginia)

    us-east-1-gov.aws

    Nur für Business Critical-Konten (oder höher) verfügbar. Befindet sich nicht in AWS GovCloud (US), einer separaten, dedizierten Cloud, die von Snowflake noch nicht unterstützt wird.

    Canada (Central)

    ca-central-1.aws

    EU (Irland)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Tokio)

    ap-northeast-1.aws

    Asia Pacific (Mumbai)

    ap-south-1.aws

    Asia Pacific (Singapur)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe West2 (London)

    europe-west2.gcp

    Europe West4 (Niederlande)

    europe-west4.gcp

    Microsoft Azure

    West US 2 (Washington)

    west-us-2.azure

    East US 2 (Virginia)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Nur für Business Critical-Konten (oder höher) verfügbar.

    Canada Central (Toronto)

    canada-central.azure

    West Europe (Niederlande)

    west-europe.azure

    Switzerland North (Zürich)

    switzerland-north.azure

    Southeast Asia (Singapur)

    southeast-asia.azure

    Australia East (New South Wales)

    australia-east.azure

    user = "<Benutzeranmeldename>"

    Geben Sie Ihren Snowflake-Anmeldenamen an.

    pipe = "<Datenbankname>.<Schemaname>.<Pipename>"

    Geben Sie den vollqualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll.

  • Geben Sie in der Liste der Dateiobjekte den Pfad zu den zu ladenden Dateien an:

    files.add("<Pfad>/<Dateiname>")

    Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung .csv.gz haben.

Beispielprogramm für das Python-SDK

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
file_list=['<path>/<filename>', '<path>/<filename>']
ingest_manager = SimpleIngestManager(account='<account_name>',
                                     host='<account_name>.<region_id>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = []
for file_name in file_list:
    staged_file_list.append(StagedFile(file_name, None))

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)

Geben Sie die Passphrase für die Entschlüsselung der Datei des privaten Schlüssels mit der Umgebungsvariablen PRIVATE_KEY_PASSPHRASE an:

  • Linux oder macOS:

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows:

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

Außerdem müssen Sie das Java-Beispielprogramms vor der Ausführung wie folgt ändern:

  • Aktualisieren Sie den Sicherheitsparameter:

    <Pfad_zu_privatem_Schlüssel>

    Geben Sie den lokalen Pfad zu der privaten Schlüsseldatei an, die Sie in Verwenden der Schlüsselpaar-Authentifizierung (unter Vorbereiten des Ladens von Daten über die Snowpipe REST-API) erstellt haben.

  • Aktualisieren Sie die Sitzungsparameter:

    account='<Kontoname>'

    Geben Sie den Namen Ihres Kontos an (bereitgestellt von Snowflake).

    host='<Kontoname>.<Regions-ID>.snowflakecomputing.com'

    Geben Sie Ihre Hostinformationen in Form einer URL an. Das Format der URL-Domäne ist unterschiedlich, je nach Region, in der sich Ihr Konto befindet:

    US West

    <Kontoname>.snowflakecomputing.com

    Andere Regionen

    <Kontoname>.<Regions-ID>.snowflakecomputing.com

    Dabei ist <Kontoname> der Name Ihres Kontos (wird von Snowflake bereitgestellt), und <region_id> ist:

    Region

    Regions-ID

    Anmerkungen

    Amazon Web Services (AWS)

    US West (Oregon)

    us-west-2

    Nur erforderlich, wenn Sie AWS PrivateLink für Konten in US West konfigurieren.

    US East (Ohio)

    us-east-2.aws

    US East (N. Virginia)

    us-east-1

    US East (Commercial Gov - N. Virginia)

    us-east-1-gov.aws

    Nur für Business Critical-Konten (oder höher) verfügbar. Befindet sich nicht in AWS GovCloud (US), einer separaten, dedizierten Cloud, die von Snowflake noch nicht unterstützt wird.

    Canada (Central)

    ca-central-1.aws

    EU (Irland)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Tokio)

    ap-northeast-1.aws

    Asia Pacific (Mumbai)

    ap-south-1.aws

    Asia Pacific (Singapur)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe West2 (London)

    europe-west2.gcp

    Europe West4 (Niederlande)

    europe-west4.gcp

    Microsoft Azure

    West US 2 (Washington)

    west-us-2.azure

    East US 2 (Virginia)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Nur für Business Critical-Konten (oder höher) verfügbar.

    Canada Central (Toronto)

    canada-central.azure

    West Europe (Niederlande)

    west-europe.azure

    Switzerland North (Zürich)

    switzerland-north.azure

    Southeast Asia (Singapur)

    southeast-asia.azure

    Australia East (New South Wales)

    australia-east.azure

    user='<user_login_name>'

    Geben Sie Ihren Snowflake-Anmeldenamen an.

    pipe='<db_name>.<schema_name>.<pipe_name>'

    Geben Sie den vollqualifizierten Namen der Pipe an, die zum Laden der Daten verwendet werden soll.

  • Geben Sie in der Liste der Dateiobjekte den Pfad zu Ihren zu importierenden Dateien an:

    file_list=['<path>/<filename>', '<path>/<filename>']

    Der von Ihnen angegebene Pfad muss relativ zu dem Stagingbereich sein, in dem sich die Dateien befinden. Geben Sie den vollständigen Namen für jede Datei an, einschließlich der Dateiendung. Eine CSV-Datei, die gzip-komprimiert ist, könnte beispielsweise die Erweiterung .csv.gz haben.

Anzeigen des Ladeverlaufs

Snowflake bietet REST-Endpunkte und eine Tabellenfunktion Information Schema zum Anzeigen des Ladeverlaufs:

Beachten Sie, dass die Abfrage der Information Schema-Tabellenfunktion oder der Account Usage-Ansicht im Gegensatz zum Aufruf der REST-Endpunkte ein aktives Warehouse erfordert.