Snowpipe REST API

REST エンドポイントを呼び出すことにより、パイプと対話します。このトピックでは、インジェストするファイルのリストを定義し、ロード履歴のレポートを取得するためのSnowpipe REST API について説明します。

Snowflakeは、Snowpipe REST APIでの作業を簡素化するJavaおよびPython APIs も提供します。

このトピックの内容:

データファイルの取り込み

Snowpipe API は、取り込むファイルのリストを定義するための REST エンドポイントを提供します。

エンドポイント: insertFiles

テーブルにインジェストするファイルについてSnowflakeに通知します。このエンドポイントからの正常な応答は、Snowflakeがテーブルに追加するファイルのリストを記録したことを意味します。必ずしもファイルがインジェストされたことを意味するわけではありません。詳細については、以下の応答コードをご参照ください。

ほとんどの場合、Snowflakeは数分以内に新しいデータをターゲットテーブルに挿入します。

メソッド: POST

POST URL:

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

URL パラメーター:

  • account (必須): Snowflakeアカウントのアカウント識別子。

  • pipeName (必須): 大文字と小文字を区別する完全修飾パイプ名。例えば、 myDatabase.mySchema.myPipe です。

  • requestId (オプション): システムを通してリクエストを追跡するために使用される文字列。各リクエストでランダムな文字列をプロバイダーすることをお勧めします。例えば、 UUID。これは、以下のように URL に追加されるべきです。 ?requestId=<your_uuid>

リクエストヘッダー

  • Content-Type::

    • text/plain: ファイルパスとファイル名のプレーンテキストリスト。この形式では size パラメーターは使用できません。

    • application/json: オプションのサイズ情報を持つファイルのリストを含む JSON オブジェクトの場合。

  • Authorization: BEARER <jwt_token>

リクエスト本文 (application/json Content-Type の場合)

リクエスト本文は、"files"という一つのキーを持つ JSON オブジェクトでなければなりません。このキーに関連付けられた値は、 JSON オブジェクトの配列で、各オブジェクトは取り込まれるファイルを表します。

{
  "files":[
    {
      "path":"filePath/file1.csv",
      "size":100
    },
    {
      "path":"filePath/file2.csv",
      "size":100
    }
   ]
}
Copy

"files"配列の各要素は、以下の属性を持つ JSON オブジェクトです。

  • path (必須): ステージングされたファイルのパスとファイル名。論理的で粒度の細かいパスを使用してステージでデータを分割することにより、当社の推奨するベストプラクティスに従った場合、ペイロードのパス値にはステージングされたファイルへの完全なパスが含まれます。

  • size (オプションですが、パフォーマンスを向上させるために推奨します): バイト単位のファイルサイズ。

リクエスト本文 (text/plain Content-Type の場合)

リクエスト本文はファイルパスとファイル名のプレーンテキストのリストでなければなりません。

filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt
Copy

注釈

投稿には最大5000個のファイルを含めることができます。指定された各ファイルパスは、 UTF-8としてシリアル化される場合、<= 1024バイトの長さである必要があります。

回答本文

応答コード:

  • 200 --- 成功。インジェストするファイルのキューにファイルが追加されます。

  • 400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。

  • 404 --- 失敗。 pipeName は認識されません。

    このエラーコードは、エンドポイントの呼び出し時に使用されるロールに十分な権限がない場合にも返される可能性があります。詳細については、 アクセス権限の付与 をご参照ください。

  • 429 --- 失敗。リクエストのレート制限を超えました。

  • 500 --- 失敗。内部エラーが発生しました。

応答ペイロード:

API リクエストが成功すると(つまり、コード200)、応答ペイロードには requestId および status 要素が JSON 形式で含まれます。エラーが発生した場合は、応答ペイロードにエラーの詳細が含まれる場合があります。

{
  "requestId": "your_request_uuid",
  "status": "success"
}
Copy

パイプ定義の COPY INTO <テーブル> ステートメントに PATTERN コピーオプションが含まれている場合、 unmatchedPatternFiles 属性には、ヘッダー内で送信され、正規表現と一致 しない ためにスキップされたファイルが一覧表示されます。

{
  "requestId": "your_request_uuid",
  "status": "success",
  "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"]
}
Copy

ロード履歴レポート

Snowpipe API は、ロードレポートをフェッチするための REST エンドポイントを提供します。

エンドポイント: insertReport

内容が最近テーブルにインジェストされた insertFiles を介して送信されたファイルのレポートを取得します。大きなファイルの場合、レポートはファイルの一部のみの場合があります。

このエンドポイントには次の制限があります。

  • 10、000の最新のイベントが保持されます。

  • イベントは最大10分間保持されます。

insertFiles を介して送信されたファイルのデータがテーブルにコミットされ、クエリで使用できるようになると、イベントが発生します。 insertReport エンドポイントは UNIX コマンドテールのように考えることができます。このコマンドを繰り返し呼び出すことにより、時間の経過とともにパイプ上のイベントの完全な履歴を見ることができます。このコマンドは、イベントを見逃さないように頻繁に呼び出す必要があります。その頻度は、ファイルが insertFiles に送信されるレートにより異なります。

メソッド: GET

GET URL:

https://<アカウント識別子>.snowflakecomputing.com/v1/data/pipes/<パイプ名>/insertReport?requestId=<リクエストID>&beginMark=<開始マーク>

URL パラメーター:

  • account_identifier (必須): Snowflakeアカウント識別子です。望ましい形式は organization_name-account_name です。別の形式(リージョンとクラウドプラットフォームによるアカウント検索)については、 形式1(推奨): 組織内のアカウント名 をご覧ください。

  • pipeName (必須): 大文字と小文字を区別する、Snowpipeの完全修飾名。例えば、 myDatabase.mySchema.myPipe です。

  • requestId (オプション):Snowflakeのシステムを通してこの特定のリクエストを追跡するために提供できる文字列です。UUID のようなランダムな文字列を使用することは、デバッグや監視を容易にするために強く推奨されます。これを URL に追加: ?requestId=<your_uuid>

  • beginMark (オプション): 以前の insertReport レスポンスの nextBeginMark フィールドに返されたマーカー値。このマーカーを含めることで、重複して返されるイベントの数を減らせる可能性があり、その後の呼び出しの最適化に役立ちます。注: beginMark は重複を避けるためのヒントとして意図されていますが、それでも時折イベントの繰り返しが発生することがあります。 beginMark が指定されていない場合、レポートは過去10分間の摂取履歴を表示します。これを URL に追加: ?beginMark=<previous_nextBeginMark>

リクエストヘッダー:

  • Accept: 希望する応答形式を指定します。使用可能な値は text/plain または application/json です。

  • 認証コード: Snowflake認証トークン。BEARER <jwt_token> の形式を使用してください。

リクエスト本文:

このエンドポイントは GET リクエストのリクエスト本文を受け付けません。必要なパラメーターは URL とヘッダーにプロバイダーとして用意されています。

応答本文:

応答コード:

  • 200 --- 成功。レポートが返されました。

  • 400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。

  • 404 --- 失敗。 pipeName は認識されません。

    このエラーコードは、エンドポイントの呼び出し時に使用されるロールに十分な権限がない場合にも返される可能性があります。詳細については、 アクセス権限の付与 をご参照ください。

  • 429 --- 失敗。リクエストのレート制限を超えました。

  • 500 --- 失敗。内部エラーが発生しました。

応答ペイロード:

成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。

例:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "nextBeginMark": "1_39",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mybucket/",
      "fileSize": 57,
      "timeReceived": "2017-06-21T04:47:41.453Z",
      "lastInsertTime": "2017-06-21T04:48:28.575Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

応答フィールド:

フィールド

説明

pipe

文字列

パイプの完全修飾名。

completeResult

ブール値

提供された beginMark とこのレポート履歴にある最初のイベントの間にイベントが欠落した場合は、 false。それ以外の場合は、 true

nextBeginMark

文字列

重複レコードの表示を避けるために、 beginMark を次のリクエストで使用。この値はヒントです。それでも重複が発生する場合があります。

files

配列

JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。

path

文字列

ステージの場所に関連するファイルパス。

stageLocation

文字列

パイプで定義されたステージ ID (内部ステージ)またはS3バケット(外部ステージ)のいずれか。

fileSize

ロング

バイト単位のファイルサイズ。

timeReceived

文字列

処理のためにこのファイルが受信された時刻。形式は UTC タイムゾーンの ISO-8601。

lastInsertTime

文字列

このファイルのデータが最後にテーブルに挿入された時刻。形式は UTC タイムゾーンの ISO-8601。

rowsInserted

ロング

ファイルからターゲットテーブルに挿入された行の数。

rowsParsed

ロング

ファイルから解析された行の数。エラーのある行はスキップされる場合があります。

errorsSeen

整数

ファイルに見られるエラーの数

errorLimit

整数

失敗( ON_ERROR コピーオプションに基づく)と見なされるまでにファイルで許可されるエラーの数。

firstError [1]

文字列

このファイルで最初に発生したエラーのエラーメッセージ。

firstErrorLineNum [1]

ロング

最初のエラーの行番号。

firstErrorCharacterPos [1]

ロング

最初のエラーの文字位置。

firstErrorColumnName [1]

文字列

最初のエラーが発生した列名。

systemError [1]

文字列

ファイルが処理されなかった理由を説明する一般エラー。

complete

ブール値

ファイルが完全に正常に処理されたかどうかを示します。

status

文字列

ファイルのロードステータス。

  • LOAD_IN_PROGRESS :ファイルの一部がテーブルにロードされましたが、ロードプロセスはまだ完了していません。

  • LOADED :ファイル全体がテーブルにロードされました。

  • LOAD_FAILED :ファイルのロードに失敗しました。

  • PARTIALLY_LOADED :このファイルの一部の行は正常にロードされましたが、その他の行はエラーのためロードされませんでした。このファイルの処理は完了しました。

[1] 値は、ファイルにエラーが含まれる場合、これらのフィールドにのみ提供されます。

エンドポイント: loadHistoryScan

内容がテーブルに追加されたインジェスト済みファイルに関するレポートをフェッチします。大きなファイルの場合、レポートはファイルの一部のみの場合があります。このエンドポイントは、2つの時点間の履歴を表示するという点で insertReport と異なります。最大10,000個のアイテムが返されますが、複数の呼び出しをパブリッシュして、目的の時間範囲をカバーできます。

重要

このエンドポイントは、過剰な呼び出しを避けるためにレート制限されています。レート制限(エラーコード429)を超えないようにするため、 loadHistoryScan よりも insertReport に強く依存することをお勧めします。 loadHistoryScan を呼び出すとき、データロードのセットを含む最も狭い時間範囲を指定します。たとえば、履歴の最後の10分を8分ごとに読むとうまくいきます。毎分過去24時間の履歴を読み取ろうとすると、レート制限に達したことを示す429エラーが発生します。レート制限は、各履歴レコードを数回読み取ることができるように設計されています。

これらの制限のない、より包括的なビューのために、Snowflakeは、パイプまたはテーブルのロード履歴を返すInformation Schemaテーブル関数 COPY_HISTORY を提供します。

メソッド: GET

GET URL:

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<開始時刻>&endTimeExclusive=<終了時刻>&requestId=<リクエストID>

URL パラメーター:

  • account (必須): Snowflakeアカウント識別子です。

  • pipeName (必須): 大文字と小文字を区別する、Snowpipeの完全修飾名。例: myDatabase.mySchema.myPipe

  • startTimeInclusive (必須): ISO-8601形式のタイムスタンプで指定します(例:2023-10-26T10:00:00Z)。このタイムスタンプは、クエリの包括的な下限を示します。

  • endTimeExclusive (オプション): ISO-8601形式のタイムスタンプで指定します(例:2023-10-26T10:15:00Z)。このタイムスタンプはクエリの排他的上限を示します。このパラメーターが省略された場合、現在のサーバーのタイムスタンプ (CURRENT_TIMESTAMP()) が時間範囲の終了として使用されます。

  • requestId (オプション):Snowflakeのシステムを通してこの特定のリクエストを追跡するために提供できる文字列です。デバッグや監視を容易にするため、 UUID のようなランダムな文字列を使用することをお勧めします。これを URL に追加: ?requestId=<your_uuid>

リクエストヘッダー:

  • Accept: 希望する応答形式を指定します。使用可能な値は text/plain または application/json です。

  • Authorization: Snowflake認証トークン。 BEARER <jwt_token> の形式を使用してください。

リクエスト本文:

このエンドポイントは GET リクエストのリクエスト本文を受け付けません。必要なパラメーターはすべて、 URL とヘッダーにプロバイダーとして用意されています。

応答本文:

応答コード:

  • 200 --- 成功。ロード履歴のスキャン結果が返されます。

  • 400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。

  • 404 --- 失敗。 pipeName は認識されません。

  • 429 --- 失敗。リクエストのレート制限を超えました。

  • 500 --- 失敗。内部エラーが発生しました。

応答ペイロード:

成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。

例:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "startTimeInclusive": "2017-08-25T18:42:31.081Z",
  "endTimeExclusive":"2017-08-25T22:43:45.552Z",
  "rangeStartTime":"2017-08-25T22:43:45.383Z",
  "rangeEndTime":"2017-08-25T22:43:45.383Z",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mystage/",
      "fileSize": 57,
      "timeReceived": "2017-08-25T22:43:45.383Z",
      "lastInsertTime": "2017-08-25T22:43:45.383Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}
Copy

応答フィールド:

フィールド

説明

pipe

文字列

パイプの完全修飾名。

completeResult

ブール値

レポートが不完全な場合(つまり、指定された時間範囲内のエントリの数が10,000エントリの制限を超える場合)は、 falsefalse の場合、ユーザーは、エントリの次のセットに進むための次のリクエストに対する startTimeInclusive 値として、現在の rangeEndTime 値を指定できます。

startTimeInclusive

文字列

リクエストで提供される開始タイムスタンプ( ISO-8601形式)。

endTimeExclusive

文字列

リクエストで提供される終了タイムスタンプ( ISO-8601形式)。

rangeStartTime

文字列

応答に含まれるファイルの最も古いエントリのタイムスタンプ( ISO-8601形式)。

rangeEndTime

文字列

応答に含まれるファイルの最新エントリのタイムスタンプ( ISO-8601形式)。

files

配列

JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。配列内の応答フィールドは、 insertReport 応答で返されるものと同じです。