Snowpipe Streaming REST API 시작하기: cURL 및 JWT자습서

참고

향상된 성능과 시작 경험의 이점을 누리려면 REST API를 통해 Snowpipe Streaming SDK를 시작하는 것이 좋습니다.

이 가이드에서는 Snowpipe Streaming REST API 및 SnowSQL</developer-guide/sql-api/authenticating>`로 생성된 :doc:`JSON 웹 토큰(JWT)을 사용하여 Snowflake로 데이터를 스트리밍하는 방법을 보여줍니다.

전제 조건

시작하기 전에 다음 항목이 있는지 확인하세요.

Snowflake 사용자 및 오브젝트:

키 페어 인증을 위해 구성된 Snowflake 사용자. 다음 SQL 명령을 사용하여 공개 키를 등록합니다.

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

스트리밍 수집을 위한 Snowflake 데이터베이스, 스키마 및 PIPE 오브젝트. 다음 SQL 명령을 사용하여 생성할 수 있습니다(MY_DATABASE, MY_SCHEMA, MY_PIPE, ``MY_TABLE``과 같은 자리 표시자를 원하는 이름으로 대체).

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

ACCOUNT_IDENTIFIER:

ACCOUNT_IDENTIFIER에는 형식 1을 사용하는 것을 권장합니다. 그러면 조직 내의 계정 이름이 사용됩니다(예: myorg-account123). 형식에 대한 자세한 내용은 계정 식별자 섹션을 참조하세요.

설치된 도구:

  • curl: HTTP 요청을 위한 용도.

  • jq: JSON 응답 구분 분석 용도.

  • SnowSQL: 명령을 실행하기 위한 Snowflake의 명령줄 클라이언트.

생성된 JWT:

SnowSQL을 사용하여 JWT를 생성합니다.

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_IDENTIFIER> \
  -u MY_USER
Copy

조심

JWT를 안전하게 저장하세요. 로그나 스크립트에 노출되지 않도록 하세요.

단계별 지침

데이터를 Snowflake로 스트리밍하려면 다음 단계를 완료합니다.

1단계: 환경 변수 설정하기

Snowflake 계정 및 스트리밍 작업에 필요한 환경 변수를 설정합니다.

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_IDENTIFIER>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

2단계: 수집 호스트 검색

중요

Snowflake 계정 이름에 밑줄이 포함된 경우(예: MY_ACCOUNT), 알려진 문제로 인해 수집 서비스를 호출할 때 내부 오류가 발생할 수 있습니다.

범위 지정 토큰을 생성하기 전에 INGEST_HOST에서 모든 밑줄을 대시로 바꿔야 합니다. 이 변환된 형식(대시 포함)은 범위 지정 토큰 자체의 생성을 포함하여 모든 후속 REST API 호출에 사용해야 합니다.

예를 들어, 반환된 호스트 이름이 :code:`my_account.region.ingest.snowflakecomputing.com`인 경우 모든 후속 REST API 호출에서 :code:`my-account.region.ingest.snowflakecomputing.com`으로 변경해야 합니다.

수집 호스트는 스트리밍 데이터의 엔드포인트입니다. JWT를 사용하여 수집 호스트를 검색합니다.

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

수집 호스트에서 작업을 승인하기 위해 범위 지정 토큰을 가져옵니다.

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

3단계: 채널 열기

스트리밍 채널을 열어 데이터 수집을 시작합니다.

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

4단계: 데이터 행 추가

열린 채널에 단일 데이터 행을 추가합니다.

4.1 연속 및 오프셋 토큰 추출하기

이러한 토큰은 스트리밍 세션의 상태를 유지하는 데 매우 중요합니다.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 샘플 행 생성하기

샘플 데이터 행을 NDJSON 형식으로 생성합니다.

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 행 추가하기

샘플 행을 스트리밍 채널로 보냅니다.

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

중요

  • After each append operation, you must update the continuationToken for the next append call. The response from the append rows call contains a next_continuation_token field that you should use to make your updates.

  • The success of the append operation confirms only that the data was received by the service, not that it is persisted to the table. Take the next step to verify persistence before querying or moving to the next batch.

4.4 Verify data persistence and committed offset by using getChannelStatus

Complete this critical step to ensure application reliability. Data isn’t guaranteed to be persistent until the committedOffset has advanced. To confirm that the rows that you just appended are successfully persisted, use getChannelStatus.

스트리밍 채널의 현재 상태를 확인합니다.

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

Verification check

You must ensure that the committedOffset returned in the response is greater than or equal to the offset of the rows you just appended. Only after the committedOffset advances can you be certain that the data is safely available in the table.

4.5 Query the table for persisted data

After you confirm that the committedOffset has advanced in the previous step (4.4), you can query to confirm that the data is ingested into your Snowflake table.

Snowflake에서 다음 SQL 쿼리를 실행합니다.

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE data:id::NUMBER = 1;
Copy

(Optional) Step 5: Clean up

임시 파일을 제거하고 환경 변수를 설정 해제합니다.

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

문제 해결하기

  • HTTP401(승인되지 않음): JWT토큰이 유효하고 만료되지 않았는지 확인합니다. 필요한 경우 다시 생성합니다.

  • HTTP404(찾을 수 없음): 데이터베이스, 스키마, 파이프 및 채널 이름의 철자가 올바른지, Snowflake 계정에 존재하는지 다시 확인합니다.

  • 수집 호스트 없음: 제어 플레인 호스트 URL이 올바르고 액세스할 수 있는지 확인합니다.