Snowpipe Streaming API REST 엔드포인트

참고

향상된 성능과 시작 경험의 이점을 누리려면 REST API를 통해 Snowpipe Streaming SDK를 시작하는 것이 좋습니다.

Snowpipe Streaming REST API는 경량 워크로드용으로 설계되었으며 Snowpipe Streaming SDK를 사용하지 않고 외부 애플리케이션과 통합할 수 있는 유연한 방법을 제공합니다.

다음 다이어그램은 데이터가 클라이언트에서 Snowflake 서버로 흐르는 방식에 대한 시각적 개요를 제공하며, 프로세스의 각 키 API 엔드포인트를 자세히 설명합니다.

Snowpipe Streaming REST API 개요

헤더 요청

다음 요청 헤더는 Snowpipe Streaming REST API 의 모든 엔드포인트에 적용됩니다.

헤더

설명

Authorization

인증 토큰

X-Snowflake-Authorization-Token-Type (선택 사항)

JWT/OAuth

참고

단일 요청 페이로드에 허용되는 최대 크기는 16MB입니다. 데이터가 더 큰 경우 여러 요청으로 분할해야 합니다.

호스트 이름 가져오기

Get Hostname 은 Snowpipe Streaming REST API 와 상호 작용하는 데 사용되는 호스트 이름을 반환합니다. 각 계정에는 고유한 호스트 이름이 있습니다.

GET /v2/streaming/hostname

응답:

{
  "hostname": "string"
}
Copy

응답 필드에 대한 설명:

필드

타입

설명

호스트 이름

String

계정의 호스트 이름입니다.

Exchange 범위 지정 토큰

:code:`Exchange Scoped Token`은 Snowpipe Streaming API 관련 서비스에만 액세스하는 데 사용할 수 있는 보안 토큰을 반환합니다. 이를 통해 고객에게 보안 보호 기능을 제공합니다.

POST /oauth/token

요청:

속성

필수

구성 요소

설명

content_type

헤더

“application/x-www-form-urlencoded”

grant_type

페이로드

urn:ietf:params:oauth:grant-type:jwt-bearer

범위

페이로드

계정의 호스트 이름입니다.

응답:

{
  "token": "string"
}
Copy

응답 필드에 대한 설명:

필드

타입

설명

토큰

String

범위 지정 토큰입니다.

Open Channel

Open Channel 작업은 파이프 또는 테이블에 대해 새 채널을 만들거나 엽니다. 채널이 이미 존재하는 경우, Snowflake는 채널의 클라이언트 시퀀서를 범프하고 마지막으로 커밋된 오프셋 토큰을 반환합니다.

PUT /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

요청:

속성

필수

구성 요소

설명

databaseName

URI

데이터베이스 이름이며, 대소문자를 구분하지 않습니다.

schemaName

URI

스키마 이름이며, 대소문자를 구분하지 않습니다.

pipeName

URI

파이프 이름이며, 대소문자를 구분하지 않습니다.

channelName

URI

생성하거나 다시 여는 채널의 이름이며, 대소문자를 구분하지 않습니다.

offset_token

아니요

페이로드

채널을 열 때 오프셋 토큰을 설정하는 데 사용되는 문자열입니다.

requestId

아니요

쿼리 매개 변수

UUID(범용 고유 식별자)는 시스템을 통해 요청을 추적하는 데 사용됩니다.

응답:

{
  "next_continuation_token": "string",
  "channel_status": {
    "database_name": "string",
    "schema_name": "string",
    "pipe_name": "string",
    "channel_name": "string",
    "channel_status_code": "string",
    "last_committed_offset_token": "string",
    "created_on_ms": "long",
    "rows_inserted": "int",
    "rows_parsed": "int",
    "rows_error_count": "int",
    "last_error_offset_upper_bound": "string",
    "last_error_message": "string",
    "last_error_timestamp": "timestamp_utc",
    "snowflake_avg_processing_latency_ms": "int"
  }
}
Copy

응답 필드에 대한 설명:

필드

타입

설명

next_continuation_token

String

후속 행 추가 요청에서 사용해야 하는 API 관리 토큰입니다. 토큰은 일련의 호출을 연결하여 연속적이고 순서대로 정렬된 데이터 스트림을 유지하고 정확히 한 번만 전달하는 세션 상태를 유지합니다.

channel_status

오브젝트

채널에 대한 다음과 같은 세부 정보가 포함된 중첩된 오브젝트입니다.

  • database_name(String): 파이프가 있는 데이터베이스의 이름입니다.

  • schema_name(String): 파이프가 있는 스키마의 이름입니다.

  • pipe_name(String): 사용 중인 특정 파이프의 이름입니다.

  • channel_name(String): 스트리밍 채널의 이름입니다.

  • channel_status_code(String): 채널의 현재 상태를 나타내는 코드입니다. 예: ‘ACTIVE’.

  • last_committed_offset_token(String): 마지막으로 커밋된 오프셋을 나타내는 토큰입니다.

  • created_on_ms(Long): 채널이 생성된 타임스탬프(밀리초)입니다.

  • rows_inserted(Int): 성공적으로 삽입된 총 행 수입니다.

  • rows_parsed(Int): 구문 분석된 총 행 수입니다.

  • rows_error_count(Int): 오류가 발생한 총 행 수입니다.

  • last_error_offset_upper_bound(String): 마지막 오류가 발생한 오프셋의 상한을 나타내는 토큰입니다.

  • last_error_message(string): 마지막으로 발생한 오류의 메시지입니다.

  • last_error_timestamp(긴): 마지막 오류의 타임스탬프(밀리초)입니다.

  • snowflake_avg_processing_latency_ms(Int): Snowflake의 평균 처리 대기 시간(밀리초)입니다.

행 추가

Append Rows 작업은 지정된 채널에 행 배치를 삽입합니다.

POST /v2/streaming/data/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}/rows

요청:

속성

필수

구성 요소

설명

databaseName

URI

데이터베이스 이름이며, 대소문자를 구분하지 않습니다.

schemaName

URI

스키마 이름이며, 대소문자를 구분하지 않습니다.

pipeName

URI

파이프 이름이며, 대소문자를 구분하지 않습니다.

channelName

URI

채널 이름이며, 대소문자를 구분하지 않습니다.

연속 토큰

쿼리 매개 변수

Snowflake의 연속 토큰이며, 클라이언트 및 행 시퀀스를 모두 캡슐화합니다.

offsetToken

아니요

쿼리 매개 변수

배치별 오프셋 토큰을 설정하는 데 사용되는 문자열입니다.

rows

페이로드

NDJSON 형식으로 수집할 실제 데이터 페이로드입니다. 이 속성에 허용되는 최대 크기는 4MB입니다.

requestId

아니요

쿼리 매개 변수

시스템을 통해 요청을 추적하는 데 사용되는 UUID.

참고

NDJSON 페이로드 내의 JSON 텍스트는 RFC 8259 표준을 엄격히 준수해야 합니다. 각 JSON 텍스트 다음에는 줄 바꿈 문자 \n`(:code:`0x0A)이 와야 합니다. 줄바꿈 문자 앞에 캐리지 리턴 \r`(:code:`0x0D)을 삽입할 수도 있습니다.

응답:

{
  "next_continuation_token": "string"
}
Copy

응답 필드에 대한 설명:

필드

타입

설명

next_continuation_token

문자열

클라이언트와 행 시퀀스를 모두 캡슐화하는 Snowflake의 다음 연속 토큰입니다. 다음 배치를 삽입하는 데 사용해야 합니다.

채널 제거

Drop Channel 작업은 메타데이터와 함께 서버 측에 채널을 드롭합니다.

DELETE /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

요청:

속성

필수

구성 요소

설명

databaseName

URI

데이터베이스 이름, 대/소문자 구분 안 함

schemaName

URI

스키마 이름, 대/소문자 구분 안 함

pipeOrTableName

URI

파이프 또는 테이블 이름, 대/소문자 구분 안 함

channelName

URI

채널 이름, 대/소문자 구분 안 함

requestId

아니요

쿼리 매개 변수

시스템을 통해 요청을 추적하는 데 사용되는 UUID

응답:

이 작업은 HTTP 상태 코드 이외의 특정 성공 응답이 없는 페이로드를 반환합니다.

채널 상태 일괄 가져오기

Bulk Get Channel Status 작업은 특정 클라이언트 시퀀서에 대한 채널의 상태를 반환합니다.

POST /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}:bulk-channel-status

요청:

속성

필수

구성 요소

설명

databaseName

URI

데이터베이스 이름, 대/소문자 구분 안 함

schemaName

URI

스키마 이름, 대/소문자 구분 안 함

pipeName

URI

파이프 이름, 대/소문자 구분 안 함

channel_names

페이로드

고객이 상태를 확인하려는 채널 이름 문자열 배열입니다. 이름은 대소문자를 구분합니다. 예: {"channel_names":["channel1", "channel2"]}.

응답:

{
  "channel_statuses": {
    "channel1": {
      "channel_status_code": "String",
      "last_committed_offset_token": "String",
      "database_name": "String",
      "schema_name": "String",
      "pipe_name": "String",
      "channel_name": "String",
      "rows_inserted": "int",
      "rows_parsed": "int",
      "rows_errors": "int",
      "last_error_offset_upper_bound": "String",
      "last_error_message": "String",
      "last_error_timestamp": "timestamp_utc",
      "snowflake_avg_processing_latency_ms": "int"
    },
    "channel2": {
      "comment": "same structure as channel1"
    }
    "comment": "potentially other channels"
  }
}
Copy

참고

서비스에서 요청된 채널을 찾을 수 없는 경우 응답 페이로드에는 channel_statuses 오브젝트 내에 해당 채널에 대한 항목이 없습니다.

각 채널에 대한 channel_statuses 필드의 설명입니다.

필드

타입

설명

channel_status_code

String

채널의 상태를 나타냅니다.

last_committed_offset_token

String

최근 커밋된 오프셋 토큰입니다.

데이터베이스_이름

String

채널이 속한 데이터베이스의 이름입니다.

스키마_이름

String

채널이 속한 스키마의 이름입니다.

파이프_이름

String

채널이 속한 파이프의 이름입니다.

channel_name

String

채널의 이름입니다.

rows_inserted

int

이 채널에 삽입된 모든 행의 개수입니다.

rows_parsed

int

구문 분석되었지만 반드시 이 채널에 삽입되지는 않은 모든 행의 개수입니다.

rows_errors

int

이 채널에 삽입할 때 오류가 발생하여 거부된 모든 행의 개수입니다.

last_error_offset_upper_bound

String

수집 오류의 상한입니다. 오류는 이 커밋된 오프셋 토큰 위치 또는 그 앞에 위치합니다.

last_error_message

String

해당 채널의 최신 오류 코드에 해당하는 사람이 읽을 수 있는 메시지로, 민감한 고객 데이터가 삭제되어 있습니다.

last_error_timestamp

timestamp_utc

마지막 오류가 발생한 시간의 타임스탬프입니다.

Snowflake_avg_processing_latency_ms

int

이 채널의 평균 엔드투엔드 처리 시간입니다.

오류 응답 구조

Snowpipe Streaming REST APIs는 오류 응답을 위한 JSON 페이로드를 반환합니다. 이 구조는 자동화된 오류 처리와 인적 분석 모두에 대해 실행 가능한 정보를 제공합니다.

응답 페이로드의 구조는 다음과 같습니다.

{
  "code": "...",
  "message": "..."
}
Copy

응답 필드

필드

타입

설명

코드

String

안정적인 프로그래밍 오류 코드입니다. 이 값은 자동화된 오류 처리 및 로깅에 사용할 수 있습니다. 예를 들어, 애플리케이션의 논리는 미리 정의된 동작을 트리거하는 특정 코드를 확인할 수 있습니다.

메시지

String

오류를 설명하는 사람이 읽을 수 있는 메시지입니다. 이 메시지는 변경될 수 있으며 자동 구문 분석에 사용해서는 안 됩니다.

다음 예는 수신할 수 있는 오류 응답을 보여줍니다.

{
  "code": "STALE_CONTINUATION_TOKEN_SEQUENCER",
  "message": "Channel sequencer in the continuation token is stale. Please reopen the channel"
}
Copy

이 예는 부실 채널 시퀀서와 함께 연속 토큰을 사용하려는 시도에 대한 응답을 보여줍니다. 코드는 오류에 대해 명확하고 컴퓨터가 읽을 수 있는 식별자를 제공하며, 메시지는 사용자에게 유용한 설명적인 텍스트를 제공합니다.