Snowpipe REST API¶
REST エンドポイントを呼び出すことにより、パイプと対話します。このトピックでは、インジェストするファイルのリストを定義し、ロード履歴のレポートを取得するためのSnowpipe REST API について説明します。
Snowflakeは、Snowpipe REST APIでの作業を簡素化するJavaおよびPython APIs も提供します。
このトピックの内容:
データファイルの取り込み¶
Snowpipe API は、取り込むファイルのリストを定義するための REST エンドポイントを提供します。
エンドポイント: insertFiles
¶
テーブルにインジェストするファイルについてSnowflakeに通知します。このエンドポイントからの正常な応答は、Snowflakeがテーブルに追加するファイルのリストを記録したことを意味します。必ずしもファイルがインジェストされたことを意味するわけではありません。詳細については、以下の応答コードをご参照ください。
ほとんどの場合、Snowflakeは数分以内に新しいデータをターゲットテーブルに挿入します。
メソッド: POST
POST URL:
https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}
URL パラメーター:
account
(必須): Snowflakeアカウントのアカウント識別子。pipeName
(必須): 大文字と小文字を区別する完全修飾パイプ名。例えば、myDatabase.mySchema.myPipe
です。requestId
(オプション): システムを通してリクエストを追跡するために使用される文字列。各リクエストでランダムな文字列をプロバイダーすることをお勧めします。例えば、 UUID。これは、以下のように URL に追加されるべきです。?requestId=<your_uuid>
リクエストヘッダー
Content-Type:
:text/plain
: ファイルパスとファイル名のプレーンテキストリスト。この形式では size パラメーターは使用できません。application/json
: オプションのサイズ情報を持つファイルのリストを含む JSON オブジェクトの場合。
Authorization
:BEARER <jwt_token>
リクエスト本文 (application/json Content-Type の場合)
リクエスト本文は、"files"という一つのキーを持つ JSON オブジェクトでなければなりません。このキーに関連付けられた値は、 JSON オブジェクトの配列で、各オブジェクトは取り込まれるファイルを表します。
{
"files":[
{
"path":"filePath/file1.csv",
"size":100
},
{
"path":"filePath/file2.csv",
"size":100
}
]
}
"files"配列の各要素は、以下の属性を持つ JSON オブジェクトです。
path
(必須): ステージングされたファイルのパスとファイル名。論理的で粒度の細かいパスを使用してステージでデータを分割することにより、当社の推奨するベストプラクティスに従った場合、ペイロードのパス値にはステージングされたファイルへの完全なパスが含まれます。size
(オプションですが、パフォーマンスを向上させるために推奨します): バイト単位のファイルサイズ。
リクエスト本文 (text/plain Content-Type の場合)
リクエスト本文はファイルパスとファイル名のプレーンテキストのリストでなければなりません。
filePath/file_a.csv
another/path/file_b.json
yet/another/file_c.txt
注釈
投稿には最大5000個のファイルを含めることができます。指定された各ファイルパスは、 UTF-8としてシリアル化される場合、<= 1024バイトの長さである必要があります。
回答本文
応答コード:
200 --- 成功。インジェストするファイルのキューにファイルが追加されます。
400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。
404 --- 失敗。
pipeName
は認識されません。このエラーコードは、エンドポイントの呼び出し時に使用されるロールに十分な権限がない場合にも返される可能性があります。詳細については、 アクセス権限の付与 をご参照ください。
429 --- 失敗。リクエストのレート制限を超えました。
500 --- 失敗。内部エラーが発生しました。
応答ペイロード:
API リクエストが成功すると(つまり、コード200)、応答ペイロードには
requestId
およびstatus
要素が JSON 形式で含まれます。エラーが発生した場合は、応答ペイロードにエラーの詳細が含まれる場合があります。{ "requestId": "your_request_uuid", "status": "success" }パイプ定義の COPY INTO <テーブル> ステートメントに PATTERN コピーオプションが含まれている場合、
unmatchedPatternFiles
属性には、ヘッダー内で送信され、正規表現と一致 しない ためにスキップされたファイルが一覧表示されます。{ "requestId": "your_request_uuid", "status": "success", "unmatchedPatternFiles": ["some_file.txt", "another_file.dat"] }
ロード履歴レポート¶
Snowpipe API は、ロードレポートをフェッチするための REST エンドポイントを提供します。
エンドポイント: insertReport
¶
内容が最近テーブルにインジェストされた insertFiles
を介して送信されたファイルのレポートを取得します。大きなファイルの場合、レポートはファイルの一部のみの場合があります。
このエンドポイントには次の制限があります。
10、000の最新のイベントが保持されます。
イベントは最大10分間保持されます。
insertFiles
を介して送信されたファイルのデータがテーブルにコミットされ、クエリで使用できるようになると、イベントが発生します。 insertReport
エンドポイントは UNIX コマンドテールのように考えることができます。このコマンドを繰り返し呼び出すことにより、時間の経過とともにパイプ上のイベントの完全な履歴を見ることができます。このコマンドは、イベントを見逃さないように頻繁に呼び出す必要があります。その頻度は、ファイルが insertFiles
に送信されるレートにより異なります。
メソッド: GET
GET URL:
https://<アカウント識別子>.snowflakecomputing.com/v1/data/pipes/<パイプ名>/insertReport?requestId=<リクエストID>&beginMark=<開始マーク>
URL パラメーター:
account_identifier
(必須): Snowflakeアカウント識別子です。望ましい形式はorganization_name-account_name
です。別の形式(リージョンとクラウドプラットフォームによるアカウント検索)については、 形式1(推奨): 組織内のアカウント名 をご覧ください。pipeName
(必須): 大文字と小文字を区別する、Snowpipeの完全修飾名。例えば、myDatabase.mySchema.myPipe
です。requestId
(オプション):Snowflakeのシステムを通してこの特定のリクエストを追跡するために提供できる文字列です。UUID のようなランダムな文字列を使用することは、デバッグや監視を容易にするために強く推奨されます。これを URL に追加:?requestId=<your_uuid>
。beginMark
(オプション): 以前のinsertReport
レスポンスのnextBeginMark
フィールドに返されたマーカー値。このマーカーを含めることで、重複して返されるイベントの数を減らせる可能性があり、その後の呼び出しの最適化に役立ちます。注:beginMark
は重複を避けるためのヒントとして意図されていますが、それでも時折イベントの繰り返しが発生することがあります。beginMark
が指定されていない場合、レポートは過去10分間の摂取履歴を表示します。これを URL に追加:?beginMark=<previous_nextBeginMark>
。
リクエストヘッダー:
Accept: 希望する応答形式を指定します。使用可能な値は
text/plain
またはapplication/json
です。認証コード: Snowflake認証トークン。BEARER <jwt_token> の形式を使用してください。
リクエスト本文:
このエンドポイントは GET リクエストのリクエスト本文を受け付けません。必要なパラメーターは URL とヘッダーにプロバイダーとして用意されています。
応答本文:
応答コード:
200 --- 成功。レポートが返されました。
400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。
404 --- 失敗。
pipeName
は認識されません。このエラーコードは、エンドポイントの呼び出し時に使用されるロールに十分な権限がない場合にも返される可能性があります。詳細については、 アクセス権限の付与 をご参照ください。
429 --- 失敗。リクエストのレート制限を超えました。
500 --- 失敗。内部エラーが発生しました。
応答ペイロード:
成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。
例:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "nextBeginMark": "1_39", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mybucket/", "fileSize": 57, "timeReceived": "2017-06-21T04:47:41.453Z", "lastInsertTime": "2017-06-21T04:48:28.575Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }応答フィールド:
フィールド
型
説明
pipe
文字列
パイプの完全修飾名。
completeResult
ブール値
提供された
beginMark
とこのレポート履歴にある最初のイベントの間にイベントが欠落した場合は、false
。それ以外の場合は、true
。
nextBeginMark
文字列
重複レコードの表示を避けるために、
beginMark
を次のリクエストで使用。この値はヒントです。それでも重複が発生する場合があります。
files
配列
JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。
path
文字列
ステージの場所に関連するファイルパス。
stageLocation
文字列
パイプで定義されたステージ ID (内部ステージ)またはS3バケット(外部ステージ)のいずれか。
fileSize
ロング
バイト単位のファイルサイズ。
timeReceived
文字列
処理のためにこのファイルが受信された時刻。形式は UTC タイムゾーンの ISO-8601。
lastInsertTime
文字列
このファイルのデータが最後にテーブルに挿入された時刻。形式は UTC タイムゾーンの ISO-8601。
rowsInserted
ロング
ファイルからターゲットテーブルに挿入された行の数。
rowsParsed
ロング
ファイルから解析された行の数。エラーのある行はスキップされる場合があります。
errorsSeen
整数
ファイルに見られるエラーの数
errorLimit
整数
失敗( ON_ERROR コピーオプションに基づく)と見なされるまでにファイルで許可されるエラーの数。
firstError
[1]文字列
このファイルで最初に発生したエラーのエラーメッセージ。
firstErrorLineNum
[1]ロング
最初のエラーの行番号。
firstErrorCharacterPos
[1]ロング
最初のエラーの文字位置。
firstErrorColumnName
[1]文字列
最初のエラーが発生した列名。
systemError
[1]文字列
ファイルが処理されなかった理由を説明する一般エラー。
complete
ブール値
ファイルが完全に正常に処理されたかどうかを示します。
status
文字列
ファイルのロードステータス。
LOAD_IN_PROGRESS :ファイルの一部がテーブルにロードされましたが、ロードプロセスはまだ完了していません。
LOADED :ファイル全体がテーブルにロードされました。
LOAD_FAILED :ファイルのロードに失敗しました。
PARTIALLY_LOADED :このファイルの一部の行は正常にロードされましたが、その他の行はエラーのためロードされませんでした。このファイルの処理は完了しました。
[1] 値は、ファイルにエラーが含まれる場合、これらのフィールドにのみ提供されます。
エンドポイント: loadHistoryScan
¶
内容がテーブルに追加されたインジェスト済みファイルに関するレポートをフェッチします。大きなファイルの場合、レポートはファイルの一部のみの場合があります。このエンドポイントは、2つの時点間の履歴を表示するという点で insertReport
と異なります。最大10,000個のアイテムが返されますが、複数の呼び出しをパブリッシュして、目的の時間範囲をカバーできます。
重要
このエンドポイントは、過剰な呼び出しを避けるためにレート制限されています。レート制限(エラーコード429)を超えないようにするため、 loadHistoryScan
よりも insertReport
に強く依存することをお勧めします。 loadHistoryScan
を呼び出すとき、データロードのセットを含む最も狭い時間範囲を指定します。たとえば、履歴の最後の10分を8分ごとに読むとうまくいきます。毎分過去24時間の履歴を読み取ろうとすると、レート制限に達したことを示す429エラーが発生します。レート制限は、各履歴レコードを数回読み取ることができるように設計されています。
これらの制限のない、より包括的なビューのために、Snowflakeは、パイプまたはテーブルのロード履歴を返すInformation Schemaテーブル関数 COPY_HISTORY を提供します。
メソッド: GET
GET URL:
https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<開始時刻>&endTimeExclusive=<終了時刻>&requestId=<リクエストID>
URL パラメーター:
account
(必須): Snowflakeアカウント識別子です。pipeName
(必須): 大文字と小文字を区別する、Snowpipeの完全修飾名。例:myDatabase.mySchema.myPipe
。startTimeInclusive
(必須): ISO-8601形式のタイムスタンプで指定します(例:2023-10-26T10:00:00Z)。このタイムスタンプは、クエリの包括的な下限を示します。endTimeExclusive
(オプション): ISO-8601形式のタイムスタンプで指定します(例:2023-10-26T10:15:00Z)。このタイムスタンプはクエリの排他的上限を示します。このパラメーターが省略された場合、現在のサーバーのタイムスタンプ (CURRENT_TIMESTAMP()) が時間範囲の終了として使用されます。requestId
(オプション):Snowflakeのシステムを通してこの特定のリクエストを追跡するために提供できる文字列です。デバッグや監視を容易にするため、 UUID のようなランダムな文字列を使用することをお勧めします。これを URL に追加:?requestId=<your_uuid>
。
リクエストヘッダー:
Accept
: 希望する応答形式を指定します。使用可能な値はtext/plain
またはapplication/json
です。Authorization
: Snowflake認証トークン。BEARER <jwt_token>
の形式を使用してください。
リクエスト本文:
このエンドポイントは GET
リクエストのリクエスト本文を受け付けません。必要なパラメーターはすべて、 URL とヘッダーにプロバイダーとして用意されています。
応答本文:
応答コード:
200 --- 成功。ロード履歴のスキャン結果が返されます。
400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。
404 --- 失敗。
pipeName
は認識されません。429 --- 失敗。リクエストのレート制限を超えました。
500 --- 失敗。内部エラーが発生しました。
応答ペイロード:
成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。
例:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "startTimeInclusive": "2017-08-25T18:42:31.081Z", "endTimeExclusive":"2017-08-25T22:43:45.552Z", "rangeStartTime":"2017-08-25T22:43:45.383Z", "rangeEndTime":"2017-08-25T22:43:45.383Z", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mystage/", "fileSize": 57, "timeReceived": "2017-08-25T22:43:45.383Z", "lastInsertTime": "2017-08-25T22:43:45.383Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }応答フィールド:
フィールド
型
説明
pipe
文字列
パイプの完全修飾名。
completeResult
ブール値
レポートが不完全な場合(つまり、指定された時間範囲内のエントリの数が10,000エントリの制限を超える場合)は、
false
。false
の場合、ユーザーは、エントリの次のセットに進むための次のリクエストに対するstartTimeInclusive
値として、現在のrangeEndTime
値を指定できます。
startTimeInclusive
文字列
リクエストで提供される開始タイムスタンプ( ISO-8601形式)。
endTimeExclusive
文字列
リクエストで提供される終了タイムスタンプ( ISO-8601形式)。
rangeStartTime
文字列
応答に含まれるファイルの最も古いエントリのタイムスタンプ( ISO-8601形式)。
rangeEndTime
文字列
応答に含まれるファイルの最新エントリのタイムスタンプ( ISO-8601形式)。
files
配列
JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。配列内の応答フィールドは、
insertReport
応答で返されるものと同じです。