옵션 1: Snowpipe REST API를 사용하여 데이터 로드하기¶
이 항목에서는 데이터를 로드하고 로드 내역 보고서를 검색하기 위해 공용 REST 엔드포인트를 호출하는 방법에 대해 설명합니다. 이 지침에서는 Snowpipe REST API를 사용하여 데이터 로드 준비하기 의 설정 지침을 완료한 것으로 가정합니다.
이 항목의 내용:
데이터 로딩하기¶
로드는 다음의 두 단계로 수행됩니다.
- 1단계:
데이터 파일을 스테이징합니다.
내부 스테이지: PUT 명령을 사용하여 파일을 스테이징합니다.
외부 스테이지: 클라우드 공급자가 제공하는 클라이언트 도구를 사용하여 파일을 스테이지 위치(Amazon S3, Google Cloud Storage 또는 Microsoft Azure)에 복사합니다.
- 2단계:
스테이징된 데이터 파일을 로드하려면 insertFiles REST 엔드포인트에 요청을 제출합니다.
편의를 위해 이 항목에서는 REST 엔드포인트의 제출 방법을 보여주는 샘플 Java 및 Python 프로그램을 제공합니다.
Java SDK용 샘플 프로그램¶
import net.snowflake.ingest.SimpleIngestManager;
import net.snowflake.ingest.connection.HistoryRangeResponse;
import net.snowflake.ingest.connection.HistoryResponse;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
import org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder;
import org.bouncycastle.operator.InputDecryptorProvider;
import org.bouncycastle.operator.OperatorCreationException;
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo;
import org.bouncycastle.pkcs.PKCSException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivateKey;
import java.security.Security;
import java.time.Instant;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class SDKTest
{
// Path to the private key file that you generated earlier.
private static final String PRIVATE_KEY_FILE = "/<path>/rsa_key.p8";
public static class PrivateKeyReader
{
// If you generated an encrypted private key, implement this method to return
// the passphrase for decrypting your private key.
private static String getPrivateKeyPassphrase() {
return "<private_key_passphrase>";
}
public static PrivateKey get(String filename)
throws Exception
{
PrivateKeyInfo privateKeyInfo = null;
Security.addProvider(new BouncyCastleProvider());
// Read an object from the private key file.
PEMParser pemParser = new PEMParser(new FileReader(Paths.get(filename).toFile()));
Object pemObject = pemParser.readObject();
if (pemObject instanceof PKCS8EncryptedPrivateKeyInfo) {
// Handle the case where the private key is encrypted.
PKCS8EncryptedPrivateKeyInfo encryptedPrivateKeyInfo = (PKCS8EncryptedPrivateKeyInfo) pemObject;
String passphrase = getPrivateKeyPassphrase();
InputDecryptorProvider pkcs8Prov = new JceOpenSSLPKCS8DecryptorProviderBuilder().build(passphrase.toCharArray());
privateKeyInfo = encryptedPrivateKeyInfo.decryptPrivateKeyInfo(pkcs8Prov);
} else if (pemObject instanceof PrivateKeyInfo) {
// Handle the case where the private key is unencrypted.
privateKeyInfo = (PrivateKeyInfo) pemObject;
}
pemParser.close();
JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider(BouncyCastleProvider.PROVIDER_NAME);
return converter.getPrivateKey(privateKeyInfo);
}
}
private static HistoryResponse waitForFilesHistory(SimpleIngestManager manager,
Set<String> files)
throws Exception
{
ExecutorService service = Executors.newSingleThreadExecutor();
class GetHistory implements
Callable<HistoryResponse>
{
private Set<String> filesWatchList;
GetHistory(Set<String> files)
{
this.filesWatchList = files;
}
String beginMark = null;
public HistoryResponse call()
throws Exception
{
HistoryResponse filesHistory = null;
while (true)
{
Thread.sleep(500);
HistoryResponse response = manager.getHistory(null, null, beginMark);
if (response.getNextBeginMark() != null)
{
beginMark = response.getNextBeginMark();
}
if (response != null && response.files != null)
{
for (HistoryResponse.FileEntry entry : response.files)
{
//if we have a complete file that we've
// loaded with the same name..
String filename = entry.getPath();
if (entry.getPath() != null && entry.isComplete() &&
filesWatchList.contains(filename))
{
if (filesHistory == null)
{
filesHistory = new HistoryResponse();
filesHistory.setPipe(response.getPipe());
}
filesHistory.files.add(entry);
filesWatchList.remove(filename);
//we can return true!
if (filesWatchList.isEmpty()) {
return filesHistory;
}
}
}
}
}
}
}
GetHistory historyCaller = new GetHistory(files);
//fork off waiting for a load to the service
Future<HistoryResponse> result = service.submit(historyCaller);
HistoryResponse response = result.get(2, TimeUnit.MINUTES);
return response;
}
public static void main(String[] args)
{
final String host = "<account_identifier>.snowflakecomputing.com";
final String user = "<user_login_name>";
final String pipe = "<db_name>.<schema_name>.<pipe_name>";
try
{
final long oneHourMillis = 1000 * 3600L;
String startTime = Instant
.ofEpochMilli(System.currentTimeMillis() - 4 * oneHourMillis).toString();
final PrivateKey privateKey = PrivateKeyReader.get(PRIVATE_KEY_FILE);
SimpleIngestManager manager = new SimpleIngestManager(host.split("\.")[0], user, pipe, privateKey, "https", host, 443);
List<StagedFileWrapper> files = new ArrayList<>();
// Add the paths and sizes the files that you want to load.
// Use paths that are relative to the stage where the files are located
// (the stage that is specified in the pipe definition)..
files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
files.add(new StagedFileWrapper("<path>/<filename>", <file_size_in_bytes> /* file size is optional but recommended, pass null when it is not available */));
...
manager.ingestFiles(files, null);
HistoryResponse history = waitForFilesHistory(manager, files);
System.out.println("Received history response: " + history.toString());
String endTime = Instant
.ofEpochMilli(System.currentTimeMillis()).toString();
HistoryRangeResponse historyRangeResponse =
manager.getHistoryRange(null,
startTime,
endTime);
System.out.println("Received history range response: " +
historyRangeResponse.toString());
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
이 예에서는 Bouncy Castle Crypto APIs 를 사용합니다. 이 예시를 컴파일 및 실행하려면 클래스 경로에 다음 JAR 파일을 포함해야 합니다.
공급자 JAR 파일(
bcprov-jdkversions.jar
)PKIX / CMS / EAC / PKCS / OCSP / TSP / OPENSSL JAR 파일(
bcpkix-jdkversions.jar
)
여기서 versions
은 JAR 파일이 지원하는 JDK의 버전을 지정합니다.
샘플 코드를 컴파일하기 전, 다음 자리 표시자 값을 바꿉니다.
PRIVATE_KEY_FILE = "/<경로>/rsa_key.p8"
키 페어 인증 및 키 순환 사용하기 (Snowpipe REST API를 사용하여 데이터 로드 준비하기)에서 생성한 개인 키 파일의 로컬 경로를 지정합니다.
getPrivateKeyPassphrase()
의return "<개인_키_암호 구문>"
암호화된 키가 생성되면,
getPrivateKeyPassphrase()
메서드를 구현하여 해당 키의 암호를 해독하기 위한 암호 구문을 반환합니다.host = "<계정_식별자>.snowflakecomputing.com"
호스트 정보를 URL 형식으로 지정합니다.
계정 식별자의 선호 형식은 다음과 같습니다.
organization_name-account_name
Snowflake 조직 및 계정의 이름입니다. 자세한 내용은 형식 1(기본 설정): 조직의 계정 이름 섹션을 참조하십시오.
아니면, 필요한 경우 계정이 호스팅되는 리전 및 클라우드 플랫폼 과 함께 계정 로케이터 를 지정합니다. 자세한 내용은 형식 2(레거시): 리전의 계정 로케이터 섹션을 참조하십시오.
user = "<사용자_로그인_이름>"
Snowflake 로그인 이름을 지정합니다.
pipe = "<db_이름>.<스키마_이름>.<파이프_이름>"
데이터를 로그하기 위해 사용할 파이프의 정규화된 이름을 지정합니다.
files.add("<경로>/<파일 이름>", <파일_크기_(바이트)>)
파일 오브젝트 목록에서 로드할 파일의 경로를 지정합니다.
선택적으로, Snowpipe가 데이터를 로딩하는 데 필요한 작업을 계산할 때 지연을 방지하기 위해 각 파일의 크기를 바이트 단위로 지정합니다.
지정하는 경로는 파일이 위치한 스테이지에 대해 상대적 이어야 합니다. 파일 확장자를 포함하여 각 파일의 전체 이름을 포함합니다. 예를 들어, gzip으로 압축된 CSV 파일은 확장자가
.csv.gz
일 수 있습니다.
Python SDK용 샘플 프로그램¶
from logging import getLogger
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from snowflake.ingest.utils.uris import DEFAULT_SCHEME
from datetime import timedelta
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
import time
import datetime
import os
import logging
logging.basicConfig(
filename='/tmp/ingest.log',
level=logging.DEBUG)
logger = getLogger(__name__)
# If you generated an encrypted private key, implement this method to return
# the passphrase for decrypting your private key.
def get_private_key_passphrase():
return '<private_key_passphrase>'
with open("/<private_key_path>/rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
get_private_key_passphrase().encode(),
default_backend())
private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format
ingest_manager = SimpleIngestManager(account='<account_identifier>',
host='<account_identifier>.snowflakecomputing.com',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)
# List of files, but wrapped into a class
staged_file_list = [
StagedFile('<path>/<filename>', <file_size_in_bytes>), # file size is optional but recommended, pass None if not available
StagedFile('<path>/<filename>', <file_size_in_bytes>), # file size is optional but recommended, pass None if not available
...
]
try:
resp = ingest_manager.ingest_files(staged_file_list)
except HTTPError as e:
# HTTP error, may need to retry
logger.error(e)
exit(1)
# This means Snowflake has received file and will start loading
assert(resp['responseCode'] == 'SUCCESS')
# Needs to wait for a while to get result in history
while True:
history_resp = ingest_manager.get_history()
if len(history_resp['files']) > 0:
print('Ingest Report:\n')
print(history_resp)
break
else:
# wait for 20 seconds
time.sleep(20)
hour = timedelta(hours=1)
date = datetime.datetime.utcnow() - hour
history_range_resp = ingest_manager.get_history_range(date.isoformat() + 'Z')
print('\nHistory scan report: \n')
print(history_range_resp)
샘플 코드를 실행하기 전, 다음 자리 표시자 값을 바꿉니다.
<개인_키_경로>
키 페어 인증 및 키 순환 사용하기 (Snowpipe REST API를 사용하여 데이터 로드 준비하기)에서 생성한 개인 키 파일의 로컬 경로를 지정합니다.
get_private_key_passphrase()
의return "<개인_키_암호 구문>"
암호화된 키가 생성되면,
get_private_key_passphrase()
함수를 구현하여 해당 키의 암호를 해독하기 위한 암호 구문을 반환합니다.account='<계정_식별자>'
계정(Snowflake에서 제공)의 고유 식별자를 지정합니다.
host
설명을 참조하십시오.host='<계정_식별자>.snowflakecomputing.com'
Snowflake 계정의 고유 호스트 이름을 지정합니다.
계정 식별자의 선호 형식은 다음과 같습니다.
organization_name-account_name
Snowflake 조직 및 계정의 이름입니다. 자세한 내용은 형식 1(기본 설정): 조직의 계정 이름 섹션을 참조하십시오.
아니면, 필요한 경우 계정이 호스팅되는 리전 및 클라우드 플랫폼 과 함께 계정 로케이터 를 지정합니다. 자세한 내용은 형식 2(레거시): 리전의 계정 로케이터 섹션을 참조하십시오.
user='<user_login_name>'
Snowflake 로그인 이름을 지정합니다.
pipe='<db_name>.<schema_name>.<pipe_name>'
데이터를 로그하기 위해 사용할 파이프의 정규화된 이름을 지정합니다.
file_list=['<path>/<filename>', '<path>/<filename>']
|staged_file_list=[StagedFile('<path>/<filename>', <file_size_in_bytes>), StagedFile('<path>/<filename>', <file_size_in_bytes>)]
파일 오브젝트 목록에서 로드할 파일의 경로를 지정합니다.
지정하는 경로는 파일이 위치한 스테이지에 대해 상대적 이어야 합니다. 파일 확장자를 포함하여 각 파일의 전체 이름을 포함합니다. 예를 들어, gzip으로 압축된 CSV 파일은 확장자가
.csv.gz
일 수 있습니다.선택적으로, Snowpipe가 데이터를 로딩하는 데 필요한 작업을 계산할 때 지연을 방지하기 위해 각 파일의 크기를 바이트 단위로 지정합니다.
로드 내역 보기¶
Snowflake는 로드 내역을 살펴보기 위한 REST 엔드포인트 및 Snowflake Information Schema 테이블 함수를 제공합니다.
REST 엔드포인트:
Information Schema 테이블 함수:
Account Usage 뷰:
REST 엔드포인트 호출에서와 다르게, Information Schema 테이블 함수 또는 Account Usage 뷰를 쿼리하려면 실행 중인 웨어하우스가 필요합니다.
스테이징된 파일 삭제하기¶
데이터를 로드하고 더 이상 파일이 필요하지 않으면 스테이징된 파일을 삭제합니다. 자세한 지침은 Snowpipe가 데이터를 로드한 후 스테이징된 파일 삭제하기 섹션을 참조하십시오.