Option 1: Loading Data Using the Snowpipe REST API

This topic describes how to call the public REST endpoints to load data and retrieve load history reports. The instructions assume you have completed the setup instructions in Preparing to Load Data Using the Snowpipe REST API.

In this Topic:

Loading Data

Loading takes place in two steps:

Step 1

Stage your data files:

  • Internal stage: Use the PUT command to stage your files.

  • External stage: Use the client tools provided by the cloud provider to copy your files to the stage location (Amazon S3, Google Cloud Storage, or Microsoft Azure).

Step 2

Submit a request to the insertFiles REST endpoint to load the staged data files.

For your convenience, sample Java and Python programs that illustrate how to submit a REST endpoint are provided in this topic.

Sample Program for the Java SDK

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.EncryptedPrivateKeyInfo;
import javax.crypto.SecretKeyFactory;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataInputStream;
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.time.Instant;
import java.util.Base64;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  private static final String PUBLIC_KEY_FILE = "/<path>/public_key.der";
  private static final String PRIVATE_KEY_FILE = "/<path>/private_key.der";

  public static class PrivateKeyReader
  {
    public static PrivateKey get(String filename)
            throws Exception
    {
      File f = new File(filename);
      FileInputStream fis = new FileInputStream(f);
      DataInputStream dis = new DataInputStream(fis);
      byte[] keyBytes = new byte[(int) f.length()];
      dis.readFully(keyBytes);
      dis.close();

      String encrypted = new String(keyBytes);
      String passphrase = System.getenv("PRIVATE_KEY_PASSPHRASE");
      encrypted = encrypted.replace("-----BEGIN ENCRYPTED PRIVATE KEY-----", "");
      encrypted = encrypted.replace("-----END ENCRYPTED PRIVATE KEY-----", "");
      EncryptedPrivateKeyInfo pkInfo = new EncryptedPrivateKeyInfo(Base64.getMimeDecoder().decode(encrypted));
      PBEKeySpec keySpec = new PBEKeySpec(passphrase.toCharArray());
      SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());
      PKCS8EncodedKeySpec encodedKeySpec = pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));
      KeyFactory keyFactory = KeyFactory.getInstance("RSA");
      PrivateKey encryptedPrivateKey = keyFactory.generatePrivate(encodedKeySpec);
      return encryptedPrivateKey;
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String account = "<account_name>";
    final String host = "<account_name>.<region_id>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(account, user, pipe, privateKey, "https", host, 443);
      Set<String> files = new TreeSet<>();
      files.add("<path>/<filename>");
      files.add("<path>/<filemame>");
      manager.ingestFiles(manager.wrapFilepaths(files), null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}

Specify the passphrase for decrypting the private key file using the PRIVATE_KEY_PASSPHRASE environment variable:

  • Linux or macOS:

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows:

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

In addition, before executing the sample Java program, you must modify it as follows:

  • Update the security parameters:

    PUBLIC_KEY_FILE = “/<path>/public_key.der”` , . PRIVATE_KEY_FILE = "/<path>/public_key.der

    Specify the local paths to the public and private key files you created in Using Key Pair Authentication (in Preparing to Load Data Using the Snowpipe REST API).

  • Update the session parameters:

    account = "<account_name>"

    Specify the name of your account (provided by Snowflake).

    host = "<account_name>.<region_id>.snowflakecomputing.com"

    Specify your host information in the form of a URL. Note that the format of the URL is different depending on the region where your account is located:

    US West

    <account_name>.snowflakecomputing.com

    Other regions

    <account_name>.<region_id>.snowflakecomputing.com

    Where <account_name> is the name of your account (provided by Snowflake) and <region_id> is:

    Region

    Region ID

    Notes

    Amazon Web Services (AWS)

    US West (Oregon)

    us-west-2

    Required only when configuring AWS PrivateLink for accounts in US West.

    US East (Ohio)

    us-east-2.aws

    US East (N. Virginia)

    us-east-1

    US East (Commercial Gov - N. Virginia)

    us-east-1-gov.aws

    Available only for accounts on Business Critical (or higher); not located in AWS GovCloud (US), which is a separate, dedicated cloud not yet supported by Snowflake.

    Canada (Central)

    ca-central-1.aws

    EU (Ireland)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Tokyo)

    ap-northeast-1.aws

    Asia Pacific (Mumbai)

    ap-south-1.aws

    Asia Pacific (Singapore)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe West2 (London)

    europe-west2.gcp

    Europe West4 (Netherlands)

    europe-west4.gcp

    Microsoft Azure

    West US 2 (Washington)

    west-us-2.azure

    East US 2 (Virginia)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Available only for accounts on Business Critical (or higher).

    Canada Central (Toronto)

    canada-central.azure

    West Europe (Netherlands)

    west-europe.azure

    Switzerland North (Zurich)

    switzerland-north.azure

    Southeast Asia (Singapore)

    southeast-asia.azure

    Australia East (New South Wales)

    australia-east.azure

    user = "<user_login_name>"

    Specify your Snowflake login name.

    pipe = "<db_name>.<schema_name>.<pipe_name>"

    Specify the fully-qualified name of the pipe to use to load the data.

  • Specify the path to your files to load in the file objects list:

    files.add("<path>/<filename>")

    The path you specify must be relative to the stage where the files are located. Include the complete name for each file, including the file extension. For example, a CSV file that is gzip-compressed might have the extension .csv.gz.

Sample Program for the Python SDK

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
file_list=['<path>/<filename>', '<path>/<filename>']
ingest_manager = SimpleIngestManager(account='<account_name>',
                                     host='<account_name>.<region_id>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = []
for file_name in file_list:
    staged_file_list.append(StagedFile(file_name, None))

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)

Specify the passphrase for decrypting the private key file using the PRIVATE_KEY_PASSPHRASE environment variable:

  • Linux or macOS:

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows:

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

In addition, before executing the sample Python program, you must modify it as follows:

  • Update the security parameter:

    <private_key_path>

    Specify the local path to the private key file you created in Using Key Pair Authentication (in Preparing to Load Data Using the Snowpipe REST API).

  • Update the session parameters:

    account='<account_name>'

    Specify the name of your account (provided by Snowflake).

    host='<account_name>.<region_id>.snowflakecomputing.com'

    Specify your host information in the form of a URL. Note that the format of the URL domain is different depending on the region where your account is located:

    US West

    <account_name>.snowflakecomputing.com

    Other regions

    <account_name>.<region_id>.snowflakecomputing.com

    Where <account_name> is the name of your account (provided by Snowflake) and <region_id> is:

    Region

    Region ID

    Notes

    Amazon Web Services (AWS)

    US West (Oregon)

    us-west-2

    Required only when configuring AWS PrivateLink for accounts in US West.

    US East (Ohio)

    us-east-2.aws

    US East (N. Virginia)

    us-east-1

    US East (Commercial Gov - N. Virginia)

    us-east-1-gov.aws

    Available only for accounts on Business Critical (or higher); not located in AWS GovCloud (US), which is a separate, dedicated cloud not yet supported by Snowflake.

    Canada (Central)

    ca-central-1.aws

    EU (Ireland)

    eu-west-1

    EU (Frankfurt)

    eu-central-1

    Asia Pacific (Tokyo)

    ap-northeast-1.aws

    Asia Pacific (Mumbai)

    ap-south-1.aws

    Asia Pacific (Singapore)

    ap-southeast-1

    Asia Pacific (Sydney)

    ap-southeast-2

    Google Cloud Platform (GCP)

    US Central1 (Iowa)

    us-central1.gcp

    Europe West2 (London)

    europe-west2.gcp

    Europe West4 (Netherlands)

    europe-west4.gcp

    Microsoft Azure

    West US 2 (Washington)

    west-us-2.azure

    East US 2 (Virginia)

    east-us-2.azure

    US Gov Virginia

    us-gov-virginia.azure

    Available only for accounts on Business Critical (or higher).

    Canada Central (Toronto)

    canada-central.azure

    West Europe (Netherlands)

    west-europe.azure

    Switzerland North (Zurich)

    switzerland-north.azure

    Southeast Asia (Singapore)

    southeast-asia.azure

    Australia East (New South Wales)

    australia-east.azure

    user='<user_login_name>'

    Specify your Snowflake login name.

    pipe='<db_name>.<schema_name>.<pipe_name>'

    Specify the fully-qualified name of the pipe to use to load the data.

  • Specify the path to your files to import in the file objects list:

    file_list=['<path>/<filename>', '<path>/<filename>']

    The path you specify must be relative to the stage where the files are located. Include the complete name for each file, including the file extension. For example, a CSV file that is gzip-compressed might have the extension .csv.gz.

Viewing the Load History

Snowflake provides REST endpoints and an Information Schema table function for viewing your load history:

Note that querying either the Information Schema table function or Account Usage view, unlike calling the REST endpoints, requires a running warehouse.