Snowpipe ストリーミング API REST エンドポイント

注釈

プライマリおよびデフォルトの選択肢として :code:`snowpipe-streaming`SDK の使用をお勧めします。RESTAPI は高スループットのシナリオに最適化されていません。

以下のリクエストヘッダーは、Snowpipe ストリーミング REST API のすべてのエンドポイントに適用されます。

ヘッダー

説明

Authorization

認証トークン

X-Snowflake-Authorization-Token-Type (オプション)

JWT/OAuth

注釈

1回のリクエストペイロードの最大許容サイズは16 MB です。データが大きい場合は、複数のリクエストに分割する必要があります。

ホスト名の取得

Get Hostname は Snowpipe ストリーミング REST API とのやりとりに使用するホスト名を返します。各アカウントは固有のホスト名を持っています。

GET /v2/streaming/hostname

応答:

フィールド

説明

hostname

string

アカウントのホスト名。

スコープトークンを交換

Exchange Scoped Token は、Snowpipe Streaming API関連サービスにのみアクセスするために使用できるセキュリティトークンを返します。これにより、顧客のセキュリティ保護が実現します。

POST /oauth/token

要求:

属性

必須

コンポーネント

説明

content_type

有り

ヘッダー

"application/x-www-form-urlencoded"

grant_type

有り

payload

"urn:ietf:params:oauth:grant-type:jwt-bearer"

scope

有り

payload

アカウントのホスト名

応答:

フィールド

説明

token

string

スコープトークン。

オープンチャンネル

Open Channel オペレーションは、パイプまたはテーブルに対して新しいチャンネルを作成またはオープンします。チャンネルが既に存在する場合、Snowflakeはそのチャンネルのクライアントシーケンサをバンプし、最後にコミットされたオフセットトークンを返します。

PUT /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

大文字と小文字を区別しないデータベース名

schemaName

有り

URI

大文字と小文字を区別しないスキーマ名

pipeName

有り

URI

大文字と小文字を区別しないパイプ名

channelName

有り

URI

大文字と小文字を区別しない、作成または再開するチャンネル名。

offset_token

無し

payload

チャンネルをオープンする際にオフセットトークンをセットするための文字列。

requestId

無し

クエリパラメーター

システムを通してリクエストを追跡するために使用される UUID

応答:

フィールド

説明

next_continuation_token

String

後続の追加行リクエストで使用する必要がある API管理のトークン。トークンは一連の呼び出しをリンクし、連続した順番のデータストリームを確保し、1回限りの配信のためにセッション状態を維持します。

チャネルステータス

オブジェクト

チャネルに関する次の詳細情報を持つネストされたオブジェクト。

  • database_name(文字列):パイプが配置されているデータベースの名前

  • schema_name(文字列):パイプが配置されているスキーマの名前

  • pipe_name(文字列):使用されている特定のパイプの名前。

  • channel_name (文字列):ストリーミングチャネルの名前。

  • channel_status_code(文字列):チャネルの現在のステータスを示すコード。例:「ACTIVE」。

  • last_committed_offset_token (文字列):最後に正常にコミットされたオフセットを表すトークン。

  • created_on_ms(ロング):チャネルが作成された時点のタイムスタンプ(ミリ秒単位)。

  • rows_inserted (Int):正常に挿入された行の合計数。

  • rows_parsed (Int):解析された行の合計数。

  • rows_error_count (Int):エラーが発生した行の合計数。

  • last_error_offset_upper_bound(文字列):最後にエラーが発生したオフセットの上限を示すトークン。

  • last_error_message(文字列):最後に発生したエラーのメッセージ。

  • last_error_timestamp(ロング):最後のエラーのタイムスタンプ(ミリ秒単位)。

  • snowflake_avg_processing_latency_ms (Int):Snowflakeの平均処理レイテンシ(ミリ秒単位)。

行の追加

Append Rows の演算子は、指定されたチャンネルにバッチで行を挿入します。

POST /v2/streaming/data/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}/rows

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

大文字と小文字を区別しないデータベース名

schemaName

有り

URI

大文字と小文字を区別しないスキーマ名

pipeName

有り

URI

大文字と小文字を区別しないパイプ

channelName

有り

URI

大文字と小文字を区別しないチャンネル名

continuationToken

有り

クエリパラメーター

Snowflakeからの継続トークン、クライアントと行シーケンサーの両方をカプセル化

offsetToken

無し

クエリパラメーター

バッチごとにオフセットトークンをセットするために使用される文字列。

rows

有り

payload

NDJSON 形式で取り込まれる実際のデータペイロード。

requestId

無し

クエリパラメーター

UUID システムを通してリクエストを追跡するために使用されます。

注釈

JSON ペイロード内の NDJSON テキストは厳密に RFC 8259 標準に準拠する必要があります。それぞれ JSON テキストの後には改行文字 \n`(:code:`0x0A)が続く必要があります。改行文字の前にキャリッジリターン \r`(:code:`0x0D)を挿入することもできます。

応答:

フィールド

説明

next_continuation_token

string

クライアントシーケンサーと行シーケンサーの両方をカプセル化したSnowflakeの次の継続トークンです。次のバッチを挿入するために使用する必要があります。

ドロップチャンネル

Drop Channel オペレーションは、メタデータとともにサーバー側にチャンネルをドロップします。

DELETE /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

大文字と小文字を区別しないデータベース名

schemaName

有り

URI

大文字と小文字を区別しないスキーマ名

pipeOrTableName

有り

URI

大文字と小文字を区別しないパイプ名またはテーブル名

channelName

有り

URI

大文字と小文字を区別しないチャンネル名

requestId

無し

クエリパラメーター

システムを通してリクエストを追跡するために使用される UUID

応答:

この操作は、HTTP ステータスコード以外に特定の成功した応答がないペイロードを返します。

チャンネルステータスの一括取得

Bulk Get Channel Status オペレーションは、特定のクライアント・シーケンサーのチャンネルのステータスを返します。

POST /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}:bulk-channel-status

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

大文字と小文字を区別しないデータベース名

schemaName

有り

URI

大文字と小文字を区別しないスキーマ名

pipeName

有り

URI

大文字と小文字を区別しないパイプ名

channel_names

有り

ペイロード

顧客がステータスを取得する文字列チャネル名の配列。名前では大文字と小文字が区別されます(例: {"channel_names":["channel1", "channel2"]})。

応答:

{
  "channel_statuses": {
    "channel1": {
      "channel_status_code": "String",
      "last_committed_offset_token": "String",
      "database_name": "String",
      "schema_name": "String",
      "pipe_name": "String",
      "channel_name": "String",
      "rows_inserted": "int",
      "rows_parsed": "int",
      "rows_errors": "int",
      "last_error_offset_upper_bound": "String",
      "last_error_message": "String",
      "last_error_timestamp": "timestamp_utc",
      "snowflake_avg_processing_latency_ms": "int"
    },
    "channel2": {
      "comment": "same structure as channel1"
    }
    "comment": "potentially other channels"
  }
}
Copy

注釈

リクエストされたチャネルがサービスで見つからない場合、応答ペイロードには channel_statuses オブジェクト内のそのチャネルのエントリがありません。

各チャネルの channel_statuses フィールドの説明:

フィールド

説明

channel_status_code

String

チャンネルのステータスを示します。

last_committed_offset_token

String

最新のコミット オフセット トークン

database_name

String

チャンネルが属するデータベース名。

schema_name

String

チャンネルが属するスキーマ名。

pipe_name

String

チャンネルが属するパイプ名。

channel_name

String

チャンネル名。

rows_inserted

int

このチャンネルに挿入されたすべての行の数。

rows_parsed

int

このチャンネルに挿入されるとは限らないが、パースされたすべての行の数。

rows_errors

int

このチャンネルへの挿入時にエラーが発生し、拒否された行の数。

last_error_offset_upper_bound

String

エラーに最後に対応する挿入行セットの最新のオフセットトークンの上限。最後にエラーが発生した行セットの実オフセット・トークンは、このオフセット・トークンか、またはチャンネル取り込み順で厳密にその前のオフセット・トークンのいずれかです。

last_error_message

String

そのチャンネルの最新のエラーコードに対応する人間が読めるメッセージで、機密性の高い顧客データは編集されています。

last_error_timestamp

timestamp_utc

最後にエラーが発生したタイムスタンプ。

snowflake_avg_processing_latency_ms

int

このチャンネルの平均e2e処理時間。

エラー応答構造

すべての JSON からのエラー応答に対して、次の APIs ペイロード形状が表示されます。

{
   "error_code": "",
   "message": ""
}
Copy