Snowpipe Streaming REST API 시작하기: cURL 및 JWT자습서

참고

향상된 성능과 시작 경험의 이점을 누리려면 REST API를 통해 Snowpipe Streaming SDK를 시작하는 것이 좋습니다.

이 가이드에서는 Snowpipe Streaming REST API 및 SnowSQL</developer-guide/sql-api/authenticating>`로 생성된 :doc:`JSON 웹 토큰(JWT)을 사용하여 Snowflake로 데이터를 스트리밍하는 방법을 보여줍니다.

전제 조건

시작하기 전에 다음 항목이 있는지 확인하세요.

Snowflake 사용자 및 오브젝트:

키 페어 인증을 위해 구성된 Snowflake 사용자. 다음 SQL 명령을 사용하여 공개 키를 등록합니다.

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

A Snowflake database, schema, and a target table for streaming ingestion. You can create them by using the following SQL commands and replacing placeholders like MY_DATABASE, MY_SCHEMA, MY_TABLE with the names that you want:

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    id NUMBER,
    c1 NUMBER,
    ts STRING
);
Copy

ACCOUNT_IDENTIFIER:

We suggest that you use Format 1 for the ACCOUNT_IDENTIFIER, which uses the account name within your organization; for example, myorg-account123. For more information on the format, see 계정 식별자.

Installed tools:

  • curl: HTTP 요청을 위한 용도.

  • jq: JSON 응답 구분 분석 용도.

  • SnowSQL: 명령을 실행하기 위한 Snowflake의 명령줄 클라이언트.

Generated JWT:

Generate your JWT by using SnowSQL:

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_IDENTIFIER> \
  -u MY_USER
Copy

조심

JWT를 안전하게 저장하세요. 로그나 스크립트에 노출되지 않도록 하세요.

단계별 지침

데이터를 Snowflake로 스트리밍하려면 다음 단계를 완료합니다.

1단계: 환경 변수 설정하기

Set up the necessary environment variables for your Snowflake account and the streaming operation. Note that the PIPE variable targets the default streaming pipe associated with your table.

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_TABLE-STREAMING"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

2단계: 수집 호스트 검색

중요

Snowflake 계정 이름에 밑줄이 포함된 경우(예: MY_ACCOUNT), 알려진 문제로 인해 수집 서비스를 호출할 때 내부 오류가 발생할 수 있습니다.

범위 지정 토큰을 생성하기 전에 INGEST_HOST에서 모든 밑줄을 대시로 바꿔야 합니다. 이 변환된 형식(대시 포함)은 범위 지정 토큰 자체의 생성을 포함하여 모든 후속 REST API 호출에 사용해야 합니다.

예를 들어, 반환된 호스트 이름이 :code:`my_account.region.ingest.snowflakecomputing.com`인 경우 모든 후속 REST API 호출에서 :code:`my-account.region.ingest.snowflakecomputing.com`으로 변경해야 합니다.

수집 호스트는 스트리밍 데이터의 엔드포인트입니다. JWT를 사용하여 수집 호스트를 검색합니다.

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

수집 호스트에서 작업을 승인하기 위해 범위 지정 토큰을 가져옵니다.

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

3단계: 채널 열기

스트리밍 채널을 열어 데이터 수집을 시작합니다.

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

4단계: 데이터 행 추가

열린 채널에 단일 데이터 행을 추가합니다.

4.1 연속 및 오프셋 토큰 추출하기

이러한 토큰은 스트리밍 세션의 상태를 유지하는 데 매우 중요합니다.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 샘플 행 생성하기

샘플 데이터 행을 NDJSON 형식으로 생성합니다.

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 행 추가하기

샘플 행을 스트리밍 채널로 보냅니다.

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

중요

  • After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token field that you should use to make your updates.

  • 추가 작업의 성공은 데이터가 서비스에서 수신되었다는 확인일 뿐이며, 테이블에 유지된다는 의미는 아닙니다. 다음 단계를 수행하여 다음 배치로 이동하거나 쿼리하기 전에 지속성을 확인합니다.

4.4 :code:`getChannelStatus`를 사용하여 데이터 지속성 및 커밋된 오프셋 확인

애플리케이션 안정성을 보장하려면 이 중요한 단계를 완료해야 합니다. :code:`committedOffset`이 진행될 때까지 데이터가 지속된다는 보장은 없습니다. 방금 추가한 행이 성공적으로 지속되는지 확인하려면 :code:`getChannelStatus`를 사용합니다.

스트리밍 채널의 현재 상태를 확인합니다.

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

유효성 검사 확인

응답에서 반환된 :code:`committedOffset`이 방금 추가한 행의 오프셋보다 크거나 같은지 확인해야 합니다. :code:`committedOffset`이 진행된 후에야 데이터를 안전하게 사용할 수 있는지 확인할 수 있습니다.

4.5 지속된 데이터에 대해 테이블 쿼리하기

:ref:`이전 단계(4.4)<verify-data-persistence>`에서 :code:`committedOffset`이 진행되었음을 확인한 경우 이를 쿼리하여 데이터가 Snowflake 테이블에 수집되었는지 확인할 수 있습니다.

Snowflake에서 다음 SQL 쿼리를 실행합니다.

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

(Optional) Step 5: Clean up

임시 파일을 제거하고 환경 변수를 설정 해제합니다.

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

문제 해결하기

  • HTTP401(승인되지 않음): JWT토큰이 유효하고 만료되지 않았는지 확인합니다. 필요한 경우 다시 생성합니다.

  • HTTP404(찾을 수 없음): 데이터베이스, 스키마, 파이프 및 채널 이름의 철자가 올바른지, Snowflake 계정에 존재하는지 다시 확인합니다.

  • 수집 호스트 없음: 제어 플레인 호스트 URL이 올바르고 액세스할 수 있는지 확인합니다.