Snowpipe Streaming REST API 시작하기: cURL 및 JWT자습서

참고

snowpipe-streaming SDK를 주요 선택 항목 및 기본 선택 항목으로 사용하는 것이 좋습니다. REST API는 처리량이 많은 시나리오에 최적화되어 있지 않습니다.

이 가이드에서는 Snowpipe Streaming REST API 및 SnowSQL</developer-guide/sql-api/authenticating>`로 생성된 :doc:`JSON 웹 토큰(JWT)을 사용하여 Snowflake로 데이터를 스트리밍하는 방법을 보여줍니다.

전제 조건

시작하기 전에 다음 항목이 있는지 확인하세요.

Snowflake 사용자 및 오브젝트:

키 페어 인증을 위해 구성된 Snowflake 사용자. 다음 SQL 명령을 사용하여 공개 키를 등록합니다.

ALTER USER MY_USER SET RSA_PUBLIC_KEY='<your-public-key>';
Copy

스트리밍 수집을 위한 Snowflake 데이터베이스, 스키마 및 PIPE 오브젝트. 다음 SQL 명령을 사용하여 생성할 수 있습니다(MY_DATABASE, MY_SCHEMA, MY_PIPE, ``MY_TABLE``과 같은 자리 표시자를 원하는 이름으로 대체).

-- Create Database and Schema
CREATE OR REPLACE DATABASE MY_DATABASE;
CREATE OR REPLACE SCHEMA MY_SCHEMA;

-- Create Target Table
CREATE OR REPLACE TABLE MY_TABLE (
    data VARIANT,
    c1 NUMBER,
    c2 STRING
);

-- Create PIPE Object for Streaming Ingestion
CREATE OR REPLACE PIPE MY_PIPE
AS COPY INTO MY_TABLE FROM (SELECT $1, $1:c1, $1:ts FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
Copy

설치된 도구:

  • curl: HTTP 요청을 위한 용도.

  • jq: JSON 응답 구분 분석 용도.

  • SnowSQL: 명령을 실행하기 위한 Snowflake의 명령줄 클라이언트.

생성된 JWT: SnowSQL을 사용하여 JWT를 생성합니다.

snowsql --private-key-path rsa_key.p8 --generate-jwt \
  -a <ACCOUNT_LOCATOR> \
  -u MY_USER
Copy

조심

JWT를 안전하게 저장하세요. 로그나 스크립트에 노출되지 않도록 하세요.

단계별 지침

데이터를 Snowflake로 스트리밍하려면 다음 단계를 완료합니다.

1단계: 환경 변수 설정하기

Snowflake 계정 및 스트리밍 작업에 필요한 환경 변수를 설정합니다.

# Paste the JWT token obtained from SnowSQL
export JWT_TOKEN="PASTE_YOUR_JWT_TOKEN_HERE"

# Configure your Snowflake account and resources:
export ACCOUNT="<ACCOUNT_LOCATOR>" # For example, ab12345
export USER="MY_USER"
export DB="MY_DATABASE"
export SCHEMA="MY_SCHEMA"
export PIPE="MY_PIPE"
export CHANNEL="MY_CHANNEL"

# Replace ACCOUNT with your Account URL Host to form the control plane host:
export CONTROL_HOST="${ACCOUNT}.snowflakecomputing.com"
Copy

2단계: 수집 호스트 검색

수집 호스트는 스트리밍 데이터의 엔드포인트입니다. JWT를 사용하여 수집 호스트를 검색합니다.

export INGEST_HOST=$(curl -sS -X GET \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -H "X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT" \
  "https://${CONTROL_HOST}/v2/streaming/hostname")

echo "Ingest Host: $INGEST_HOST"
Copy

수집 호스트에서 작업을 승인하기 위해 범위 지정 토큰을 가져옵니다.

export SCOPED_TOKEN=$(curl -sS -X POST "https://$CONTROL_HOST/oauth/token" \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H "Authorization: Bearer $JWT_TOKEN" \
  -d "grant_type=urn:ietf:params:oauth:grant-type:jwt-bearer&scope=${INGEST_HOST}")

echo "Scoped Token obtained for ingest host"
Copy

3단계: 채널 열기

스트리밍 채널을 열어 데이터 수집을 시작합니다.

curl -sS -X PUT \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL" \
  -d '{}' | tee open_resp.json | jq .
Copy

4단계: 데이터 행 추가

열린 채널에 단일 데이터 행을 추가합니다.

4.1 연속 및 오프셋 토큰 추출하기

이러한 토큰은 스트리밍 세션의 상태를 유지하는 데 매우 중요합니다.

export CONT_TOKEN=$(jq -r '.next_continuation_token' open_resp.json)
export OFFSET_TOKEN=$(jq -r '.channel_status.last_committed_offset_token' open_resp.json)
export NEW_OFFSET=$((OFFSET_TOKEN + 1))
Copy

4.2 샘플 행 생성하기

샘플 데이터 행을 NDJSON 형식으로 생성합니다.

export NOW_TS=$(date -u +"%Y-%m-%dT%H:%M:%SZ")

cat <<EOF > rows.ndjson
{
  "id": 1,
  "c1": $RANDOM,
  "ts": "$NOW_TS"
}
EOF
Copy

4.3 행 추가하기

샘플 행을 스트리밍 채널로 보냅니다.

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/x-ndjson" \
  "https://${INGEST_HOST}/v2/streaming/data/databases/$DB/schemas/$SCHEMA/pipes/$PIPE/channels/$CHANNEL/rows?continuationToken=$CONT_TOKEN&offsetToken=$NEW_OFFSET" \
  --data-binary @rows.ndjson | jq .
Copy

중요

각 추가 작업 후에는 다음 추가 호출을 위해 연속 토큰을 업데이트해야 합니다. 추가된 행 호출에 대한 응답에는 업데이트를 수행하는 데 사용해야 하는 next_continuation_token 필드가 포함되어 있습니다.

4.4 데이터 수집 확인

데이터가 Snowflake 테이블에 수집되었는지 확인합니다.

Snowflake에서 다음 SQL 쿼리를 실행합니다.

SELECT * FROM MY_DATABASE.MY_SCHEMA.MY_TABLE WHERE id = 1;
Copy

5단계: 채널 상태 가져오기

스트리밍 채널의 현재 상태를 확인합니다.

curl -sS -X POST \
  -H "Authorization: Bearer $SCOPED_TOKEN" \
  -H "Content-Type: application/json" \
  "https://${INGEST_HOST}/v2/streaming/databases/$DB/schemas/$SCHEMA/pipes/$PIPE:bulk-channel-status" \
  -d "{\"channel_names\": [\"$CHANNEL\"]}" | jq ".channel_statuses.\"$CHANNEL\""
Copy

(선택 사항) 6단계: 정리

임시 파일을 제거하고 환경 변수를 설정 해제합니다.

rm -f rows.ndjson open_resp.json
unset JWT_TOKEN SCOPED_TOKEN ACCOUNT USER DB SCHEMA PIPE CHANNEL CONTROL_HOST INGEST_HOST CONT_TOKEN OFFSET_TOKEN NEW_OFFSET NOW_TS
Copy

문제 해결하기

  • HTTP401(승인되지 않음): JWT토큰이 유효하고 만료되지 않았는지 확인합니다. 필요한 경우 다시 생성합니다.

  • HTTP404(찾을 수 없음): 데이터베이스, 스키마, 파이프 및 채널 이름의 철자가 올바른지, Snowflake 계정에 존재하는지 다시 확인합니다.

  • 수집 호스트 없음: 제어 플레인 호스트 URL이 올바르고 액세스할 수 있는지 확인합니다.