Snowpipe Streaming REST API 시작하기: cURL 및 JWT자습서¶
참고
향상된 성능과 시작 경험의 이점을 누리려면 REST API를 통해 Snowpipe Streaming SDK를 시작하는 것이 좋습니다.
이 가이드에서는 Snowpipe Streaming REST API 및 SnowSQL</developer-guide/sql-api/authenticating>`로 생성된 :doc:`JSON 웹 토큰(JWT)을 사용하여 Snowflake로 데이터를 스트리밍하는 방법을 보여줍니다.
전제 조건¶
시작하기 전에 다음 항목이 있는지 확인하세요.
Snowflake 사용자 및 오브젝트:
키 페어 인증을 위해 구성된 Snowflake 사용자. 다음 SQL 명령을 사용하여 공개 키를 등록합니다.
ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
A Snowflake database, schema, and a target table for streaming ingestion. You can create them by using the following SQL commands and replacing placeholders like MY_DATABASE, MY_SCHEMA, MY_TABLE with the names that you want:
-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;
-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
id NUMBER,
c1 NUMBER,
ts STRING
);
ACCOUNT_IDENTIFIER:
We suggest that you use Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see 계정 식별자.
Installed tools:
curl: HTTP 요청을 위한 용도.jq: JSON 응답 구분 분석 용도.SnowSQL: 명령을 실행하기 위한 Snowflake의 명령줄 클라이언트.
Generated JWT:
Generate your JWT by using SnowSQL:
snowsql --private-key-path rsa_key.p8 --generate-jwt \
-a <ACCOUNT_IDENTIFIER> \
-u MY_USER
조심
JWT를 안전하게 저장하세요. 로그나 스크립트에 노출되지 않도록 하세요.
단계별 지침¶
데이터를 Snowflake로 스트리밍하려면 다음 단계를 완료합니다.
1단계: 환경 변수 설정하기¶
Set up the necessary environment variables for your Snowflake account and the streaming operation. Note that the PIPE variable targets the default streaming pipe associated with your table.
# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"
# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"
# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
2단계: 수집 호스트 검색¶
중요
Snowflake 계정 이름에 밑줄이 포함된 경우(예: MY_ACCOUNT), 알려진 문제로 인해 수집 서비스를 호출할 때 내부 오류가 발생할 수 있습니다.
범위 지정 토큰을 생성하기 전에 INGEST_HOST에서 모든 밑줄을 대시로 바꿔야 합니다. 이 변환된 형식(대시 포함)은 범위 지정 토큰 자체의 생성을 포함하여 모든 후속 REST API 호출에 사용해야 합니다.
예를 들어, 반환된 호스트 이름이 :code:`my_account.region.ingest.snowflakecomputing.com`인 경우 모든 후속 REST API 호출에서 :code:`my-account.region.ingest.snowflakecomputing.com`으로 변경해야 합니다.
수집 호스트는 스트리밍 데이터의 엔드포인트입니다. JWT를 사용하여 수집 호스트를 검색합니다.
export INGEST_HOST=$(curl -sS -X GET \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
"https://${CONTROL_HOST}/v2/streaming/hostname")
echo "Ingest Host: $INGEST_HOST"
수집 호스트에서 작업을 승인하기 위해 범위 지정 토큰을 가져옵니다.
export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
-H 'Content-Type: application/x-www-form-urlencoded' \
-H "Authorization: Bearer $JWT_TOKEN" \
-d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")
echo "Scoped Token obtained for ingest host"
3단계: 채널 열기¶
스트리밍 채널을 열어 데이터 수집을 시작합니다.
curl -sS -X PUT \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
-d '{}' | tee open_resp.json | jq .
4단계: 데이터 행 추가¶
열린 채널에 단일 데이터 행을 추가합니다.
4.1 연속 및 오프셋 토큰 추출하기¶
이러한 토큰은 스트리밍 세션의 상태를 유지하는 데 매우 중요합니다.
export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
4.2 샘플 행 생성하기¶
샘플 데이터 행을 NDJSON 형식으로 생성합니다.
export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
cat <<EOF > rows.ndjson
{
"id": 1,
"c1": $RANDOM,
"ts": "$NOW_TS"
}
EOF
4.3 행 추가하기¶
샘플 행을 스트리밍 채널로 보냅니다.
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/x-ndjson" \
"https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
--data-binary @rows.ndjson | jq .
중요
After each append operation, you must update the
continuationTokenfor the next append call. The response from the append rows call contains anext_continuation_tokenfield that you should use to make your updates.추가 작업의 성공은 데이터가 서비스에서 수신되었다는 확인일 뿐이며, 테이블에 유지된다는 의미는 아닙니다. 다음 단계를 수행하여 다음 배치로 이동하거나 쿼리하기 전에 지속성을 확인합니다.
4.4 :code:`getChannelStatus`를 사용하여 데이터 지속성 및 커밋된 오프셋 확인¶
애플리케이션 안정성을 보장하려면 이 중요한 단계를 완료해야 합니다. :code:`committedOffset`이 진행될 때까지 데이터가 지속된다는 보장은 없습니다. 방금 추가한 행이 성공적으로 지속되는지 확인하려면 :code:`getChannelStatus`를 사용합니다.
스트리밍 채널의 현재 상태를 확인합니다.
curl -sS -X POST \
-H "Authorization: Bearer $SCOPED_TOKEN" \
-H "Content-Type: application/json" \
"https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
-d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
유효성 검사 확인
응답에서 반환된 :code:`committedOffset`이 방금 추가한 행의 오프셋보다 크거나 같은지 확인해야 합니다. :code:`committedOffset`이 진행된 후에야 데이터를 안전하게 사용할 수 있는지 확인할 수 있습니다.
4.5 지속된 데이터에 대해 테이블 쿼리하기¶
:ref:`이전 단계(4.4)<verify-data-persistence>`에서 :code:`committedOffset`이 진행되었음을 확인한 경우 이를 쿼리하여 데이터가 Snowflake 테이블에 수집되었는지 확인할 수 있습니다.
Snowflake에서 다음 SQL 쿼리를 실행합니다.
SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
(Optional) Step 5: Clean up¶
임시 파일을 제거하고 환경 변수를 설정 해제합니다.
rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
문제 해결하기¶
HTTP401(승인되지 않음): JWT토큰이 유효하고 만료되지 않았는지 확인합니다. 필요한 경우 다시 생성합니다.
HTTP404(찾을 수 없음): 데이터베이스, 스키마, 파이프 및 채널 이름의 철자가 올바른지, Snowflake 계정에 존재하는지 다시 확인합니다.
수집 호스트 없음: 제어 플레인 호스트 URL이 올바르고 액세스할 수 있는지 확인합니다.