オプション1:Snowpipe REST API を使用したデータのロード

このトピックでは、パブリック REST エンドポイントを呼び出してデータをロードし、ロード履歴レポートを取得する方法について説明します。手順は、 Snowpipe REST APIを使用したデータロードの準備 の設定手順を完了していることを前提としています。

このトピックの内容:

データのロード

2つのステップでロードします。

ステップ1

データファイルをステージングします。

  • 内部ステージ: PUT コマンドを使用して、ファイルをステージングします。

  • 外部ステージ:クラウドプロバイダーが提供するクライアントツールを使用して、ファイルをステージの場所(Amazon S3、Google Cloud Storage、またはMicrosoft Azure)にコピーします。

ステップ2

insertFiles RESTエンドポイントにリクエストを送信して、ステージングされたデータファイルをロードします。

便宜上、このトピックでは、 REST エンドポイントを送信する方法を示すサンプルのJavaおよびPythonプログラムを提供しています。

Java SDK のサンプルプログラム

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.EncryptedPrivateKeyInfo;
import javax.crypto.SecretKeyFactory;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.io.File;
import java.io.FileInputStream;
import java.io.DataInputStream;
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;
import java.time.Instant;
import java.util.Base64;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  private static final String PUBLIC_KEY_FILE = "/<path>/public_key.der";
  private static final String PRIVATE_KEY_FILE = "/<path>/private_key.der";

  public static class PrivateKeyReader
  {
    public static PrivateKey get(String filename)
            throws Exception
    {
      File f = new File(filename);
      FileInputStream fis = new FileInputStream(f);
      DataInputStream dis = new DataInputStream(fis);
      byte[] keyBytes = new byte[(int) f.length()];
      dis.readFully(keyBytes);
      dis.close();

      String encrypted = new String(keyBytes);
      String passphrase = System.getenv("PRIVATE_KEY_PASSPHRASE");
      encrypted = encrypted.replace("-----BEGIN ENCRYPTED PRIVATE KEY-----", "");
      encrypted = encrypted.replace("-----END ENCRYPTED PRIVATE KEY-----", "");
      EncryptedPrivateKeyInfo pkInfo = new EncryptedPrivateKeyInfo(Base64.getMimeDecoder().decode(encrypted));
      PBEKeySpec keySpec = new PBEKeySpec(passphrase.toCharArray());
      SecretKeyFactory pbeKeyFactory = SecretKeyFactory.getInstance(pkInfo.getAlgName());
      PKCS8EncodedKeySpec encodedKeySpec = pkInfo.getKeySpec(pbeKeyFactory.generateSecret(keySpec));
      KeyFactory keyFactory = KeyFactory.getInstance("RSA");
      PrivateKey encryptedPrivateKey = keyFactory.generatePrivate(encodedKeySpec);
      return encryptedPrivateKey;
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String account = "<account_name>";
    final String host = "<account_name>.<region_id>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(account, user, pipe, privateKey, "https", host, 443);
      Set<String> files = new TreeSet<>();
      files.add("<path>/<filename>");
      files.add("<path>/<filemame>");
      manager.ingestFiles(manager.wrapFilepaths(files), null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}

PRIVATE_KEY_PASSPHRASE 環境変数を使用して、秘密キーファイルを復号化するためのパスフレーズを指定します。

  • Linuxまたは macOS:

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows:

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

さらに、サンプルJavaプログラムを実行する 前に 、次のように変更する必要があります。

  • セキュリティパラメーターを更新します。

    PUBLIC_KEY_FILE = "/<パス>/public_key.der"` 、 . PRIVATE_KEY_FILE = "/<パス>/public_key.der

    キーペア認証の使用Snowpipe REST APIを使用したデータロードの準備 内)で作成した公開キーファイルと秘密キーファイルへのローカルパスを指定します。

  • セッションパラメーターを更新します。

    account = "<アカウント名>"

    アカウントの名前を指定します(Snowflakeが提供)。

    host = "<アカウント名>.<地域ID>.snowflakecomputing.com"

    ホスト情報を URL ドメインの形式で指定します。 URL の形式は、アカウントが置かれている 地域 によって異なります。

    US 西部

    <アカウント名>.snowflakecomputing.com

    その他の地域

    <アカウント名>.<地域ID>.snowflakecomputing.com

    <アカウント名> はアカウントの名前(Snowflakeが提供)で、 <地域ID> は次のとおりです。

    地域

    地域 ID

    メモ

    Amazon Web Services(AWS)

    US 西部(オレゴン)

    us-west-2

    US 西部のアカウントに AWS PrivateLink を設定する場合にのみ必要です。

    US 東部(オハイオ)

    us-east-2.aws

    US 東部(バージニア北部)

    us-east-1

    US 東部(商業組織、バージニア政府北部)

    us-east-1-gov.aws

    Business Critical(またはそれ以上)のアカウントでのみ使用できます。Snowflakeがまだサポートしていない独立した専用クラウドである、 AWS GovCloud (US)には位置していません。

    カナダ(中部)

    ca-central-1.aws

    EU (アイルランド)

    eu-west-1

    EU (フランクフルト)

    eu-central-1

    アジア太平洋(東京)

    ap-northeast-1.aws

    アジア太平洋(ムンバイ)

    ap-south-1.aws

    アジア太平洋(シンガポール)

    ap-southeast-1

    アジア太平洋(シドニー)

    ap-southeast-2

    Google Cloud Platform(GCP)

    US 中央部1(アイオワ)

    us-central1.gcp

    ヨーロッパ西部2(ロンドン)

    europe-west2.gcp

    ヨーロッパ西部4(オランダ)

    europe-west4.gcp

    Microsoft Azure

    西 US 2(ワシントン)

    west-us-2.azure

    東 US 2(バージニア)

    east-us-2.azure

    US 政府バージニア

    us-gov-virginia.azure

    Business Critical(またはそれ以上)のアカウントでのみ利用可能です。

    カナダ中央部(トロント)

    canada-central.azure

    西ヨーロッパ(オランダ)

    west-europe.azure

    スイス北部(チューリッヒ)

    switzerland-north.azure

    東南アジア(シンガポール)

    southeast-asia.azure

    オーストラリア東部(ニューサウスウェールズ)

    australia-east.azure

    user = "<ユーザーログイン名>"

    Snowflakeログイン名を指定します。

    pipe = "<データベース名>.<スキーマ名>.<パイプ名>"

    データのロードに使用するパイプの完全修飾名を指定します。

  • ファイルオブジェクトリストでロードするファイルへのパスを指定します。

    files.add("<パス>/<ファイル名>")

    指定するパスは、ファイルが配置されているステージを 基準 にする必要があります。ファイル拡張子を含む各ファイルの完全な名前を含めます。たとえば、gzip圧縮された CSV ファイルには、拡張子 .csv.gz が付いている場合があります。

Python SDK のサンプルプログラム

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
file_list=['<path>/<filename>', '<path>/<filename>']
ingest_manager = SimpleIngestManager(account='<account_name>',
                                     host='<account_name>.<region_id>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = []
for file_name in file_list:
    staged_file_list.append(StagedFile(file_name, None))

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)

PRIVATE_KEY_PASSPHRASE 環境変数を使用して、秘密キーファイルを復号化するためのパスフレーズを指定します。

  • Linuxまたは macOS:

    export PRIVATE_KEY_PASSPHRASE='<passphrase>'
    
  • Windows:

    set PRIVATE_KEY_PASSPHRASE='<passphrase>'
    

さらに、サンプルPythonプログラムを実行する 前に 、次のように変更する必要があります。

  • セキュリティパラメーターを更新します。

    <秘密キーパス>

    キーペア認証の使用Snowpipe REST APIを使用したデータロードの準備 内)で作成した秘密キーファイルへのローカルパスを指定します。

  • セッションパラメーターを更新します。

    account='<アカウント名>'

    アカウントの名前を指定します(Snowflakeが提供)。

    host='<アカウント名>.<地域ID>.snowflakecomputing.com'

    ホスト情報を URL ドメインの形式で指定します。 URL ドメインの形式は、アカウントが置かれている 地域 によって異なります。

    US 西部

    <アカウント名>.snowflakecomputing.com

    その他の地域

    <アカウント名>.<地域ID>.snowflakecomputing.com

    <アカウント名> はアカウントの名前(Snowflakeが提供)で、 <region_id> は次のとおりです。

    地域

    地域 ID

    メモ

    Amazon Web Services(AWS)

    US 西部(オレゴン)

    us-west-2

    US 西部のアカウントに AWS PrivateLink を設定する場合にのみ必要です。

    US 東部(オハイオ)

    us-east-2.aws

    US 東部(バージニア北部)

    us-east-1

    US 東部(商業組織、バージニア政府北部)

    us-east-1-gov.aws

    Business Critical(またはそれ以上)のアカウントでのみ使用できます。Snowflakeがまだサポートしていない独立した専用クラウドである、 AWS GovCloud (US)には位置していません。

    カナダ(中部)

    ca-central-1.aws

    EU (アイルランド)

    eu-west-1

    EU (フランクフルト)

    eu-central-1

    アジア太平洋(東京)

    ap-northeast-1.aws

    アジア太平洋(ムンバイ)

    ap-south-1.aws

    アジア太平洋(シンガポール)

    ap-southeast-1

    アジア太平洋(シドニー)

    ap-southeast-2

    Google Cloud Platform(GCP)

    US 中央部1(アイオワ)

    us-central1.gcp

    ヨーロッパ西部2(ロンドン)

    europe-west2.gcp

    ヨーロッパ西部4(オランダ)

    europe-west4.gcp

    Microsoft Azure

    西 US 2(ワシントン)

    west-us-2.azure

    東 US 2(バージニア)

    east-us-2.azure

    US 政府バージニア

    us-gov-virginia.azure

    Business Critical(またはそれ以上)のアカウントでのみ利用可能です。

    カナダ中央部(トロント)

    canada-central.azure

    西ヨーロッパ(オランダ)

    west-europe.azure

    スイス北部(チューリッヒ)

    switzerland-north.azure

    東南アジア(シンガポール)

    southeast-asia.azure

    オーストラリア東部(ニューサウスウェールズ)

    australia-east.azure

    user='<user_login_name>'

    Snowflakeログイン名を指定します。

    pipe='<db_name>.<schema_name>.<pipe_name>'

    データのロードに使用するパイプの完全修飾名を指定します。

  • ファイルオブジェクトリストでインポートするファイルへのパスを指定します。

    file_list=['<path>/<filename>', '<path>/<filename>']

    指定するパスは、ファイルが配置されているステージを 基準 にする必要があります。ファイル拡張子を含む各ファイルの完全な名前を含めます。たとえば、gzip圧縮された CSV ファイルには、拡張子 .csv.gz が付いている場合があります。

ロード履歴を表示する

Snowflakeは、ロード履歴を表示するための REST エンドポイント情報スキーマ テーブル関数を提供します。

REST エンドポイントの呼び出しとは異なり、情報スキーマテーブル関数またはアカウント使用状況ビューのいずれかをクエリするには、実行中のウェアハウスが必要です。