Snowpipe Streaming REST API 시작하기: cURL 및 JWT자습서¶
참고
snowpipe-streaming
SDK를 주요 선택 항목 및 기본 선택 항목으로 사용하는 것이 좋습니다. REST API는 처리량이 많은 시나리오에 최적화되어 있지 않습니다.
이 가이드에서는 Snowpipe Streaming REST API 및 SnowSQL</developer-guide/sql-api/authenticating>`로 생성된 :doc:`JSON 웹 토큰(JWT)을 사용하여 Snowflake로 데이터를 스트리밍하는 방법을 보여줍니다.
전제 조건¶
시작하기 전에 다음 항목이 있는지 확인하세요.
Snowflake 사용자 및 오브젝트:
키 페어 인증을 위해 구성된 Snowflake 사용자. 다음 SQL 명령을 사용하여 공개 키를 등록합니다.
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
스트리밍 수집을 위한 Snowflake 데이터베이스, 스키마 및 PIPE 오브젝트. 다음 SQL 명령을 사용하여 생성할 수 있습니다(MY_DATABASE
, MY_SCHEMA
, MY_PIPE
, ``MY_TABLE``과 같은 자리 표시자를 원하는 이름으로 대체).
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
data VARIANT,
c1 NUMBER,
c2 STRING
);
-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
설치된 도구:
curl
: HTTP 요청을 위한 용도.jq
: JSON 응답 구분 분석 용도.SnowSQL
: 명령을 실행하기 위한 Snowflake의 명령줄 클라이언트.
생성된 JWT: SnowSQL을 사용하여 JWT를 생성합니다.
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_LOCATOR> \
-u MY_USER
조심
JWT를 안전하게 저장하세요. 로그나 스크립트에 노출되지 않도록 하세요.
단계별 지침¶
데이터를 Snowflake로 스트리밍하려면 다음 단계를 완료합니다.
1단계: 환경 변수 설정하기¶
Snowflake 계정 및 스트리밍 작업에 필요한 환경 변수를 설정합니다.
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
2단계: 수집 호스트 검색¶
수집 호스트는 스트리밍 데이터의 엔드포인트입니다. JWT를 사용하여 수집 호스트를 검색합니다.
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
수집 호스트에서 작업을 승인하기 위해 범위 지정 토큰을 가져옵니다.
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
3단계: 채널 열기¶
스트리밍 채널을 열어 데이터 수집을 시작합니다.
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
4단계: 데이터 행 추가¶
열린 채널에 단일 데이터 행을 추가합니다.
4.1 연속 및 오프셋 토큰 추출하기¶
이러한 토큰은 스트리밍 세션의 상태를 유지하는 데 매우 중요합니다.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 샘플 행 생성하기¶
샘플 데이터 행을 NDJSON 형식으로 생성합니다.
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 행 추가하기¶
샘플 행을 스트리밍 채널로 보냅니다.
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
중요
각 추가 작업 후에는 다음 추가 호출을 위해 연속 토큰을 업데이트해야 합니다. 추가된 행 호출에 대한 응답에는 업데이트를 수행하는 데 사용해야 하는 next_continuation_token
필드가 포함되어 있습니다.
4.4 데이터 수집 확인¶
데이터가 Snowflake 테이블에 수집되었는지 확인합니다.
Snowflake에서 다음 SQL 쿼리를 실행합니다.
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
5단계: 채널 상태 가져오기¶
스트리밍 채널의 현재 상태를 확인합니다.
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
(선택 사항) 6단계: 정리¶
임시 파일을 제거하고 환경 변수를 설정 해제합니다.
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
문제 해결하기¶
HTTP401(승인되지 않음): JWT토큰이 유효하고 만료되지 않았는지 확인합니다. 필요한 경우 다시 생성합니다.
HTTP404(찾을 수 없음): 데이터베이스, 스키마, 파이프 및 채널 이름의 철자가 올바른지, Snowflake 계정에 존재하는지 다시 확인합니다.
수집 호스트 없음: 제어 플레인 호스트 URL이 올바르고 액세스할 수 있는지 확인합니다.