Snowpipe ストリーミング API REST エンドポイント

注釈

RESTAPI でSnowpipe Streaming SDK を始めて、パフォーマンスと使用開始のエクスペリエンスの向上を実感することをお勧めします。

Snowpipe Streaming REST API は軽量なワークロード用に設計されており、Snowpipe Streaming SDK を使用せずに外部アプリケーションと統合する柔軟な方法を提供します。

次の図は、クライアントからSnowflakeサーバーにデータがどのように流れるかを視覚的に表したもので、プロセスにおける各キー API エンドポイントの詳細を示しています。

Snowpipe Streaming REST API の概要

リクエストヘッダー

以下のリクエストヘッダーは、Snowpipe ストリーミング REST API のすべてのエンドポイントに適用されます。

ヘッダー

説明

Authorization

認証トークン

X-Snowflake-Authorization-Token-Type (オプション)

JWT/OAuth

注釈

1回のリクエストペイロードの最大許容サイズは16 MB です。データが大きい場合は、複数のリクエストに分割する必要があります。

ホスト名の取得

Get Hostname は Snowpipe ストリーミング REST API とのやりとりに使用するホスト名を返します。各アカウントは固有のホスト名を持っています。

GET /v2/streaming/hostname

応答:

{
  "hostname": "string"
}
Copy

応答フィールドの説明:

フィールド

説明

ホスト名

String

アカウントのホスト名。

スコープトークンを交換

Exchange Scoped Token は、Snowpipe Streaming API関連サービスにのみアクセスするために使用できるセキュリティトークンを返します。これにより、顧客のセキュリティ保護が実現します。

POST /oauth/token

要求:

属性

必須

コンポーネント

説明

content_type

有り

ヘッダー

"application/x-www-form-urlencoded"

grant_type

有り

ペイロード

"urn:ietf:params:oauth:grant-type:jwt-bearer"

scope

有り

ペイロード

アカウントのホスト名。

応答:

{
  "token": "string"
}
Copy

応答フィールドの説明:

フィールド

説明

トークン

String

スコープトークン。

オープンチャンネル

Open Channel オペレーションは、パイプまたはテーブルに対して新しいチャンネルを作成またはオープンします。チャンネルが既に存在する場合、Snowflakeはそのチャンネルのクライアントシーケンサをバンプし、最後にコミットされたオフセットトークンを返します。

PUT /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

データベース名。大文字と小文字は区別されません。

schemaName

有り

URI

スキーマ名。大文字と小文字は区別されません。

pipeName

有り

URI

パイプ名。大文字と小文字は区別されません。

channelName

有り

URI

作成または再度開くチャネルの名前。大文字と小文字は区別されません。

offset_token

無し

ペイロード

チャンネルをオープンする際にオフセットトークンをセットするための文字列。

requestId

無し

クエリパラメーター

汎用一意識別子(UUID)。システムを通じてリクエストを追跡するために使用されます。

応答:

{
  "next_continuation_token": "string",
  "channel_status": {
    "database_name": "string",
    "schema_name": "string",
    "pipe_name": "string",
    "channel_name": "string",
    "channel_status_code": "string",
    "last_committed_offset_token": "string",
    "created_on_ms": "long",
    "rows_inserted": "int",
    "rows_parsed": "int",
    "rows_error_count": "int",
    "last_error_offset_upper_bound": "string",
    "last_error_message": "string",
    "last_error_timestamp": "timestamp_utc",
    "snowflake_avg_processing_latency_ms": "int"
  }
}
Copy

応答フィールドの説明:

フィールド

説明

next_continuation_token

String

後続の追加行リクエストで使用する必要がある API管理のトークン。トークンは一連の呼び出しをリンクし、連続した順番のデータストリームを確保し、1回限りの配信のためにセッション状態を維持します。

チャネルステータス

オブジェクト

チャネルに関する次の詳細情報を持つネストされたオブジェクト。

  • database_name(文字列):パイプが配置されているデータベースの名前。

  • schema_name(文字列):パイプが配置されているスキーマの名前。

  • pipe_name(文字列):使用されている特定のパイプの名前。

  • channel_name (文字列):ストリーミングチャネルの名前。

  • channel_status_code(文字列):チャネルの現在のステータスを示すコード。例:「ACTIVE」。

  • last_committed_offset_token (文字列):最後に正常にコミットされたオフセットを表すトークン。

  • created_on_ms(ロング):チャネルが作成された時点のタイムスタンプ(ミリ秒単位)。

  • rows_inserted (Int):正常に挿入された行の合計数。

  • rows_parsed (Int):解析された行の合計数。

  • rows_error_count (Int):エラーが発生した行の合計数。

  • last_error_offset_upper_bound(文字列):最後にエラーが発生したオフセットの上限を示すトークン。

  • last_error_message(文字列):最後に発生したエラーのメッセージ。

  • last_error_timestamp(ロング):最後のエラーのタイムスタンプ(ミリ秒単位)。

  • snowflake_avg_processing_latency_ms (Int):Snowflakeの平均処理レイテンシ(ミリ秒単位)。

行の追加

Append Rows の演算子は、指定されたチャンネルにバッチで行を挿入します。

POST /v2/streaming/data/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}/rows

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

データベース名。大文字と小文字は区別されません。

schemaName

有り

URI

スキーマ名。大文字と小文字は区別されません。

pipeName

有り

URI

パイプ。大文字と小文字は区別されません。

channelName

有り

URI

チャネル名。大文字と小文字は区別されません。

continuationToken

有り

クエリパラメーター

Snowflakeからの継続トークンで、クライアントと行シーケンスの両方をカプセル化します。

offsetToken

無し

クエリパラメーター

バッチごとにオフセットトークンをセットするために使用される文字列。

rows

有り

ペイロード

NDJSON 形式で取り込まれる実際のデータペイロード。この属性の最大許容サイズは4 MB です。

requestId

無し

クエリパラメーター

UUID システムを通してリクエストを追跡するために使用されます。

注釈

JSON ペイロード内の NDJSON テキストは厳密に RFC 8259 標準に準拠する必要があります。それぞれ JSON テキストの後には改行文字 \n`(:code:`0x0A)が続く必要があります。改行文字の前にキャリッジリターン \r`(:code:`0x0D)を挿入することもできます。

応答:

{
  "next_continuation_token": "string"
}
Copy

応答フィールドの説明:

フィールド

説明

next_continuation_token

string

クライアントシーケンサーと行シーケンサーの両方をカプセル化したSnowflakeの次の継続トークンです。次のバッチを挿入するために使用する必要があります。

ドロップチャンネル

Drop Channel オペレーションは、メタデータとともにサーバー側にチャンネルをドロップします。

DELETE /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}/channels/{channelName}

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

大文字と小文字を区別しないデータベース名

schemaName

有り

URI

大文字と小文字を区別しないスキーマ名

pipeOrTableName

有り

URI

大文字と小文字を区別しないパイプ名またはテーブル名

channelName

有り

URI

大文字と小文字を区別しないチャンネル名

requestId

無し

クエリパラメーター

システムを通してリクエストを追跡するために使用される UUID

応答:

この操作は、HTTP ステータスコード以外に特定の成功した応答がないペイロードを返します。

チャンネルステータスの一括取得

Bulk Get Channel Status オペレーションは、特定のクライアント・シーケンサーのチャンネルのステータスを返します。

POST /v2/streaming/databases/{databaseName}/schemas/{schemaName}/pipes/{pipeName}:bulk-channel-status

要求:

属性

必須

コンポーネント

説明

databaseName

有り

URI

大文字と小文字を区別しないデータベース名

schemaName

有り

URI

大文字と小文字を区別しないスキーマ名

pipeName

有り

URI

大文字と小文字を区別しないパイプ名

channel_names

有り

ペイロード

顧客がステータスを取得する文字列チャネル名の配列。名前では大文字と小文字が区別されます(例: {"channel_names":["channel1", "channel2"]})。

応答:

{
  "channel_statuses": {
    "channel1": {
      "channel_status_code": "String",
      "last_committed_offset_token": "String",
      "database_name": "String",
      "schema_name": "String",
      "pipe_name": "String",
      "channel_name": "String",
      "rows_inserted": "int",
      "rows_parsed": "int",
      "rows_errors": "int",
      "last_error_offset_upper_bound": "String",
      "last_error_message": "String",
      "last_error_timestamp": "timestamp_utc",
      "snowflake_avg_processing_latency_ms": "int"
    },
    "channel2": {
      "comment": "same structure as channel1"
    }
    "comment": "potentially other channels"
  }
}
Copy

注釈

リクエストされたチャネルがサービスで見つからない場合、応答ペイロードには channel_statuses オブジェクト内のそのチャネルのエントリがありません。

各チャネルの channel_statuses フィールドの説明:

フィールド

説明

channel_status_code

String

チャネルのステータスを示します。

last_committed_offset_token

String

最後にコミットされたオフセットトークン。

database_name

String

チャンネルが属するデータベース名。

schema_name

String

チャンネルが属するスキーマ名。

pipe_name

String

チャンネルが属するパイプ名。

channel_name

String

チャンネル名。

rows_inserted

int

このチャンネルに挿入されたすべての行の数。

rows_parsed

int

このチャンネルに挿入されるとは限らないが、パースされたすべての行の数。

rows_errors

int

このチャネルに挿入したときにエラーが発生したため、拒否されたすべての行の数。

last_error_offset_upper_bound

String

インジェスチョンエラーの上限。エラーは、このコミットされたオフセットトークンに、またはその前に配置されます。

last_error_message

String

そのチャンネルの最新のエラーコードに対応する人間が読めるメッセージで、機密性の高い顧客データは編集されています。

last_error_timestamp

timestamp_utc

最後にエラーが発生した時間のタイムスタンプ。

snowflake_avg_processing_latency_ms

int

このチャネルの平均エンドツーエンド処理時間。

エラー応答構造

Snowpipe Streaming RESTAPIs はエラー応答の JSON ペイロードを返します。この構造は、自動エラー処理と人間による分析の両方のために実用的な情報を提供します。

応答ペイロードは次の構造を持ちます。

{
  "code": "...",
  "message": "..."
}
Copy

応答フィールド

フィールド

説明

コード

String

安定したプログラムのエラーコード。この値は、自動エラー処理とログに使用できます。たとえば、アプリケーションのロジックは、特定のコードをチェックして、事前定義されたアクションをトリガーすることができます。

メッセージ

String

エラーを説明する判読可能なメッセージ。このメッセージは変更される可能性があるため、自動解析には使用しないでください。

次の例は、受け取る可能性のあるエラー応答を示しています。

{
  "code": "STALE_CONTINUATION_TOKEN_SEQUENCER",
  "message": "Channel sequencer in the continuation token is stale. Please reopen the channel"
}
Copy

この例は、古いチャネルシーケンサーで継続トークンを使用しようとした場合の応答を示しています。コードはエラーの明確で機械判読可能な識別子を提供し、メッセージはユーザーにとって役立つ説明テキストを提供します。