옵션 1: Snowpipe REST API를 사용하여 데이터 로드하기

이 항목에서는 데이터를 로드하고 로드 내역 보고서를 검색하기 위해 공용 REST 엔드포인트를 호출하는 방법에 대해 설명합니다. 이 지침에서는 Snowpipe REST API를 사용하여 데이터 로드 준비하기 의 설정 지침을 완료한 것으로 가정합니다.

이 항목의 내용:

데이터 로드하기

로드는 다음의 두 단계로 수행됩니다.

1단계

데이터 파일을 스테이징합니다.

  • 내부 스테이지: PUT 명령을 사용하여 파일을 스테이징합니다.

  • 외부 스테이지: 클라우드 공급자가 제공하는 클라이언트 도구를 사용하여 파일을 스테이지 위치(Amazon S3, Google Cloud Storage 또는 Microsoft Azure)에 복사합니다.

2단계

스테이징된 데이터 파일을 로드하려면 insertFiles REST 엔드포인트에 요청을 제출합니다.

편의를 위해 이 항목에서는 REST 엔드포인트의 제출 방법을 보여주는 샘플 Java 및 Python 프로그램을 제공합니다.

Java SDK용 샘플 프로그램

import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.bouncycastle.pkcs.PKCSException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivateKey;
import java.security.Security;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class SDKTest
{
  // Path to the private key file that you generated earlier.
  private static final String PRIVATE_KEY_FILE = "/<path>/rsa_key.p8";

  public static class PrivateKeyReader
  {
    // If you generated an encrypted private key, implement this method to return
    // the passphrase for decrypting your private key.
    private static String getPrivateKeyPassphrase() {
      return "<private_key_passphrase>";
    }

    public static PrivateKey get(String filename)
            throws Exception
    {
      PrivateKeyInfo privateKeyInfo = null;
      Security.addProvider(new BouncyCastleProvider());
      // Read an object from the private key file.
      PEMParser pemParser = new PEMParser(new FileReader(Paths.get(filename).toFile()));
      Object pemObject = pemParser.readObject();
      if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
        // Handle the case where the private key is encrypted.
        PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemObject;
        String passphrase = getPrivateKeyPassphrase();
        InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
        privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
      } else if (pemObject instanceof PrivateKeyInfo) {
        // Handle the case where the private key is unencrypted.
        privateKeyInfo = (PrivateKeyInfo) pemObject;
      }
      pemParser.close();
      JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
      return converter.getPrivateKey(privateKeyInfo);
    }
  }

  private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
                                                     Set<String> files)
          throws Exception
  {
    ExecutorService service = Executors.newSingleThreadExecutor();

    class GetHistory implements
            Callable<HistoryResponse>
    {
      private Set<String> filesWatchList;
      GetHistory(Set<String> files)
      {
        this.filesWatchList = files;
      }
      String beginMark = null;

      public HistoryResponse call()
              throws Exception
      {
        HistoryResponse filesHistory = null;
        while (true)
        {
          Thread.sleep(500);
          HistoryResponse response = manager.getHistory(null, null, beginMark);
          if (response.getNextBeginMark() != null)
          {
            beginMark = response.getNextBeginMark();
          }
          if (response != null && response.files != null)
          {
            for (HistoryResponse.FileEntry entry : response.files)
            {
              //if we have a complete file that we've
              // loaded with the same name..
              String filename = entry.getPath();
              if (entry.getPath() != null && entry.isComplete() &&
                      filesWatchList.contains(filename))
              {
                if (filesHistory == null)
                {
                  filesHistory = new HistoryResponse();
                  filesHistory.setPipe(response.getPipe());
                }
                filesHistory.files.add(entry);
                filesWatchList.remove(filename);
                //we can return true!
                if (filesWatchList.isEmpty()) {
                  return filesHistory;
                }
              }
            }
          }
        }
      }
    }

    GetHistory historyCaller = new GetHistory(files);
    //fork off waiting for a load to the service
    Future<HistoryResponse> result = service.submit(historyCaller);

    HistoryResponse response = result.get(2, TimeUnit.MINUTES);
    return response;
  }

  public static void main(String[] args)
  {
    final String host = "<account_identifier>.snowflakecomputing.com";
    final String user = "<user_login_name>";
    final String pipe = "<db_name>.<schema_name>.<pipe_name>";
    try
    {
      final long oneHourMillis = 1000 * 3600L;
      String startTime = Instant
              .ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
      final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
      SimpleIngestManager manager = new SimpleIngestManager(host.split("\.")[0], user, pipe, privateKey, "https", host, 443);
      List<StagedFileWrapper> files = new ArrayList<>();
      // Add the paths and sizes the files that you want to load.
      // Use paths that are relative to the stage where the files are located
      // (the stage that is specified in the pipe definition)..
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
      ...
      manager.ingestFiles(files, null);
      HistoryResponse history = waitForFilesHistory(manager, files);
      System.out.println("Received history response: " + history.toString());
      String endTime = Instant
              .ofEpochMilli(System.currentTimeMillis()).toString();

      HistoryRangeResponse historyRangeResponse =
              manager.getHistoryRange(null,
                                      startTime,
                                      endTime);
      System.out.println("Received history range response: " +
                                 historyRangeResponse.toString());

    }
    catch (Exception e)
    {
      e.printStackTrace();
    }

  }
}
Copy

이 예에서는 Bouncy Castle Crypto APIs 를 사용합니다. 이 예시를 컴파일 및 실행하려면 클래스 경로에 다음 JAR 파일을 포함해야 합니다.

  • 공급자 JAR 파일(bcprov-jdkversions.jar)

  • PKIX / CMS / EAC / PKCS / OCSP / TSP / OPENSSL JAR 파일(bcpkix-jdkversions.jar)

여기서 versions 은 JAR 파일이 지원하는 JDK의 버전을 지정합니다.

샘플 코드를 컴파일하기 전, 다음 자리 표시자 값을 바꿉니다.

PRIVATE_KEY_FILE = "/<경로>/rsa_key.p8"

키 페어 인증 및 키 순환 사용하기 (Snowpipe REST API를 사용하여 데이터 로드 준비하기)에서 생성한 개인 키 파일의 로컬 경로를 지정합니다.

getPrivateKeyPassphrase()return "<개인_키_암호 구문>"

암호화된 키가 생성되면, getPrivateKeyPassphrase() 메서드를 구현하여 해당 키의 암호를 해독하기 위한 암호 구문을 반환합니다.

host = "<계정_식별자>.snowflakecomputing.com"

호스트 정보를 URL 형식으로 지정합니다.

계정 식별자의 선호 형식은 다음과 같습니다.

organization_name-account_name

Snowflake 조직 및 계정의 이름입니다. 자세한 내용은 형식 1(기본 설정): 조직의 계정 이름 섹션을 참조하십시오.

아니면, 필요한 경우 계정이 호스팅되는 리전클라우드 플랫폼 과 함께 계정 로케이터 를 지정합니다. 자세한 내용은 형식 2(레거시): 리전의 계정 로케이터 섹션을 참조하십시오.

user = "<사용자_로그인_이름>"

Snowflake 로그인 이름을 지정합니다.

pipe = "<db_이름>.<스키마_이름>.<파이프_이름>"

데이터를 로그하기 위해 사용할 파이프의 정규화된 이름을 지정합니다.

files.add("<경로>/<파일 이름>", <파일_크기_(바이트)>)

파일 오브젝트 목록에서 로드할 파일의 경로를 지정합니다.

선택적으로, Snowpipe가 데이터를 로딩하는 데 필요한 작업을 계산할 때 지연을 방지하기 위해 각 파일의 크기를 바이트 단위로 지정합니다.

지정하는 경로는 파일이 위치한 스테이지에 대해 상대적 이어야 합니다. 파일 확장자를 포함하여 각 파일의 전체 이름을 포함합니다. 예를 들어, gzip으로 압축된 CSV 파일은 확장자가 .csv.gz 일 수 있습니다.

Python SDK용 샘플 프로그램

from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging

logging.basicConfig(
        filename='/tmp/ingest.log',
        level=logging.DEBUG)
logger = getLogger(__name__)

# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
  return '<private_key_passphrase>'

with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
  pemlines = pem_in.read()
  private_key_obj = load_pem_private_key(pemlines,
  get_private_key_passphrase().encode(),
  default_backend())

private_key_text = private_key_obj.private_bytes(
  Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

ingest_manager = SimpleIngestManager(account='<account_identifier>',
                                     host='<account_identifier>.snowflakecomputing.com',
                                     user='<user_login_name>',
                                     pipe='<db_name>.<schema_name>.<pipe_name>',
                                     private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = [
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  StagedFile('<path>/<filename>', <file_size_in_bytes>),  # file size is optional but recommended, pass None if not available
  ...
  ]

try:
    resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
    # HTTP error, may need to retry
    logger.error(e)
    exit(1)

# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')

# Needs to wait for a while to get result in history
while True:
    history_resp = ingest_manager.get_history()

    if len(history_resp['files']) > 0:
        print('Ingest Report:\n')
        print(history_resp)
        break
    else:
        # wait for 20 seconds
        time.sleep(20)

    hour = timedelta(hours=1)
    date = datetime.datetime.utcnow() - hour
    history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')

    print('\nHistory scan report: \n')
    print(history_range_resp)
Copy

샘플 코드를 실행하기 전, 다음 자리 표시자 값을 바꿉니다.

<개인_키_경로>

키 페어 인증 및 키 순환 사용하기 (Snowpipe REST API를 사용하여 데이터 로드 준비하기)에서 생성한 개인 키 파일의 로컬 경로를 지정합니다.

get_private_key_passphrase()return "<개인_키_암호 구문>"

암호화된 키가 생성되면, get_private_key_passphrase() 함수를 구현하여 해당 키의 암호를 해독하기 위한 암호 구문을 반환합니다.

account='<계정_식별자>'

계정(Snowflake에서 제공)의 고유 식별자를 지정합니다. host 설명을 참조하십시오.

host='<계정_식별자>.snowflakecomputing.com'

Snowflake 계정의 고유 호스트 이름을 지정합니다.

계정 식별자의 선호 형식은 다음과 같습니다.

organization_name-account_name

Snowflake 조직 및 계정의 이름입니다. 자세한 내용은 형식 1(기본 설정): 조직의 계정 이름 섹션을 참조하십시오.

아니면, 필요한 경우 계정이 호스팅되는 리전클라우드 플랫폼 과 함께 계정 로케이터 를 지정합니다. 자세한 내용은 형식 2(레거시): 리전의 계정 로케이터 섹션을 참조하십시오.

user='<user_login_name>'

Snowflake 로그인 이름을 지정합니다.

pipe='<db_name>.<schema_name>.<pipe_name>'

데이터를 로그하기 위해 사용할 파이프의 정규화된 이름을 지정합니다.

file_list=['<path>/<filename>', '<path>/<filename>'] | staged_file_list=[StagedFile('<path>/<filename>', <file_size_in_bytes>), StagedFile('<path>/<filename>', <file_size_in_bytes>)]

파일 오브젝트 목록에서 로드할 파일의 경로를 지정합니다.

지정하는 경로는 파일이 위치한 스테이지에 대해 상대적 이어야 합니다. 파일 확장자를 포함하여 각 파일의 전체 이름을 포함합니다. 예를 들어, gzip으로 압축된 CSV 파일은 확장자가 .csv.gz 일 수 있습니다.

선택적으로, Snowpipe가 데이터를 로딩하는 데 필요한 작업을 계산할 때 지연을 방지하기 위해 각 파일의 크기를 바이트 단위로 지정합니다.

로드 내역 보기

Snowflake는 로드 내역을 살펴보기 위한 REST 엔드포인트Snowflake Information Schema 테이블 함수를 제공합니다.

REST 엔드포인트 호출에서와 다르게, Information Schema 테이블 함수 또는 Account Usage 뷰를 쿼리하려면 실행 중인 웨어하우스가 필요합니다.

스테이징된 파일 삭제하기

데이터를 로드하고 더 이상 파일이 필요하지 않으면 스테이징된 파일을 삭제합니다. 자세한 지침은 Snowpipe가 데이터를 로드한 후 스테이징된 파일 삭제하기 섹션을 참조하십시오.