Snowpipe REST API¶
REST エンドポイントを呼び出すことにより、パイプと対話します。このトピックでは、インジェストするファイルのリストを定義し、ロード履歴のレポートを取得するためのSnowpipe REST API について説明します。
Snowflakeは、Snowpipe REST APIでの作業を簡素化するJavaおよびPython APIs も提供します。
このトピックの内容:
データファイルの取り込み¶
Snowpipe API は、取り込むファイルのリストを定義するための REST エンドポイントを提供します。
エンドポイント: insertFiles
¶
テーブルにインジェストするファイルについてSnowflakeに通知します。このエンドポイントからの正常な応答は、Snowflakeがテーブルに追加するファイルのリストを記録したことを意味します。必ずしもファイルがインジェストされたことを意味するわけではありません。詳細については、以下の応答コードをご参照ください。
ほとんどの場合、Snowflakeは数分以内に新しいデータをターゲットテーブルに挿入します。
- method
POST
- post url
https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}
- post body
次の属性を持つ JSON オブジェクト。
属性
必須
説明
account
はい
Snowflakeアカウントの アカウント識別子。
pipeName
はい
大文字と小文字が区別される完全修飾パイプ名。例:
myDatabase.mySchema.myPipe
。requestId
いいえ
システムを通じてリクエストを追跡するために使用される文字列。UUID などのリクエストごとに、ランダムな文字列を指定することをお勧めします。
- content-type
text/plain
application/json
- header fields
承諾:
text/plain
またはapplication/json
認証: BEARER <JWTトークン>
text/plain
の場合、コンテンツはパスおよびファイル名のリスト(1行に1つずつ)です。size
パラメーターは許可されていません。application/json
の場合、コンテンツはパス、ファイル名、およびファイルサイズのリストです(オプションですが、パフォーマンスを向上させるために推奨されます)。ペイロードの例は次のとおりです。{ "files":[ { "path":"filePath/file1.csv", "size":100 }, { "path":"filePath/file2.csv", "size":100 } ] }
論理的で詳細なパスを使用し、ステージでデータをパーティション分割することにより、推奨されるベストプラクティスに従う場合には、ペイロードのパス値にステージングされたファイルへの完全なパスが含まれます。
注釈
投稿には最大5000個のファイルを含めることができます。
指定された各ファイルパスは、 UTF-8としてシリアル化される場合、<= 1024バイトの長さである必要があります。
- response body
応答コード:
200 --- 成功。インジェストするファイルのキューにファイルが追加されます。
400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。
404 --- 失敗。
pipeName
は認識されません。このエラーコードは、エンドポイントの呼び出し時に使用されるロールに十分な権限がない場合にも返される可能性があります。詳細については、 アクセス権限の付与 をご参照ください。
429 --- 失敗。リクエストのレート制限を超えました。
500 --- 失敗。内部エラーが発生しました。
応答ペイロード:
API リクエストが成功すると(つまり、コード200)、応答ペイロードには
requestId
およびstatus
要素が JSON 形式で含まれます。エラーが発生した場合は、応答ペイロードにエラーの詳細が含まれる場合があります。パイプ定義の COPY INTO <テーブル> ステートメントに PATTERN コピーオプションが含まれている場合、
unmatchedPatternFiles
属性には、ヘッダー内で送信され、正規表現と一致 しない ためにスキップされたファイルが一覧表示されます。
ロード履歴レポート¶
Snowpipe API は、ロードレポートをフェッチするための REST エンドポイントを提供します。
エンドポイント: insertReport
¶
内容が最近テーブルにインジェストされた insertFiles
を介して送信されたファイルのレポートを取得します。大きなファイルの場合、レポートはファイルの一部のみの場合があります。
このエンドポイントには次の制限があります。
10、000の最新のイベントが保持されます。
イベントは最大10分間保持されます。
insertFiles
を介して送信されたファイルのデータがテーブルにコミットされ、クエリで使用できるようになると、イベントが発生します。 insertReport
エンドポイントは UNIX コマンドテールのように考えることができます。このコマンドを繰り返し呼び出すことにより、時間の経過とともにパイプ上のイベントの完全な履歴を見ることができます。このコマンドは、イベントを見逃さないように頻繁に呼び出す必要があります。その頻度は、ファイルが insertFiles
に送信されるレートにより異なります。
- method
GET
- get url
https://<アカウント識別子>.snowflakecomputing.com/v1/data/pipes/<パイプ名>/insertReport?requestId=<リクエストID>&beginMark=<開始マーク>
- header fields
承諾: テキスト/プレーン、またはアプリケーション/JSON
認証: BEARER <JWTトークン>
- get body
次の属性を持つ JSON オブジェクト。
属性
必須
説明
account_identifier
はい
Snowflakeアカウント用の一意の識別子。
アカウント識別子の推奨形式は次のとおりです。
organization_name-account_name
Snowflake組織とアカウントの名前。詳細については、 形式1(推奨): 組織内のアカウント名。 をご参照ください。
または、必要に応じて、アカウントがホストされている リージョン および クラウドプラットフォーム とともに、 アカウントロケーター を指定します。詳細については、 形式2(レガシー): リージョン内のアカウントロケーター。 をご参照ください。
pipeName
はい
大文字と小文字が区別される完全修飾パイプ名。例:
myDatabase.mySchema.myPipe
。requestId
いいえ
システムを通じてリクエストを追跡するために使用される文字列。UUID などのリクエストごとに、ランダムな文字列を指定することをお勧めします。
beginMark
はい
insertReport
に対する以前の呼び出しによって返されたマーカーは、insertReport
を繰り返し呼び出したときに表示される、繰り返されるイベントの数を減らすために使用できます。これはヒントであり、ときどき繰り返しイベントが返される場合があります。
- response body
応答コード:
200 --- 成功。レポートが返されました。
400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。
404 --- 失敗。
pipeName
は認識されません。このエラーコードは、エンドポイントの呼び出し時に使用されるロールに十分な権限がない場合にも返される可能性があります。詳細については、 アクセス権限の付与 をご参照ください。
429 --- 失敗。リクエストのレート制限を超えました。
500 --- 失敗。内部エラーが発生しました。
応答ペイロード:
成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。
例:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "nextBeginMark": "1_39", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mybucket/", "fileSize": 57, "timeReceived": "2017-06-21T04:47:41.453Z", "lastInsertTime": "2017-06-21T04:48:28.575Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }
応答フィールド:
フィールド
型
説明
pipe
文字列
パイプの完全修飾名。
completeResult
ブール値
提供された
beginMark
とこのレポート履歴にある最初のイベントの間にイベントが欠落した場合は、false
。それ以外の場合は、true
。nextBeginMark
文字列
重複記録の表示を避けるために、
beginMark
を次のリクエストで使用。この値はヒントです。それでも重複が発生する場合があります。files
配列
JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。
path
文字列
ステージの場所に関連するファイルパス。
stageLocation
文字列
パイプで定義されたステージ ID (内部ステージ)またはS3バケット(外部ステージ)のいずれか。
fileSize
ロング
バイト単位のファイルサイズ。
timeReceived
文字列
処理のためにこのファイルが受信された時刻。形式は UTC タイムゾーンの ISO-8601。
lastInsertTime
文字列
このファイルのデータが最後にテーブルに挿入された時刻。形式は UTC タイムゾーンの ISO-8601。
rowsInserted
ロング
ファイルからターゲットテーブルに挿入された行の数。
rowsParsed
ロング
ファイルから解析された行の数。エラーのある行はスキップされる場合があります。
errorsSeen
整数
ファイルに見られるエラーの数
errorLimit
整数
失敗( ON_ERROR コピーオプションに基づく)と見なされるまでにファイルで許可されるエラーの数。
firstError
[1]文字列
このファイルで最初に発生したエラーのエラーメッセージ。
firstErrorLineNum
[1]ロング
最初のエラーの行番号。
firstErrorCharacterPos
[1]ロング
最初のエラーの文字位置。
firstErrorColumnName
[1]文字列
最初のエラーが発生した列名。
systemError
[1]文字列
ファイルが処理されなかった理由を説明する一般エラー。
complete
ブール値
ファイルが完全に正常に処理されたかどうかを示します。
status
文字列
ファイルのロードステータス。
LOAD_IN_PROGRESS :ファイルの一部がテーブルにロードされましたが、ロードプロセスはまだ完了していません。
LOADED :ファイル全体がテーブルにロードされました。
LOAD_FAILED :ファイルのロードに失敗しました。
PARTIALLY_LOADED :このファイルの一部の行は正常にロードされましたが、その他の行はエラーのためロードされませんでした。このファイルの処理は完了しました。
[1] 値は、ファイルにエラーが含まれる場合、これらのフィールドにのみ提供されます。
エンドポイント: loadHistoryScan
¶
内容がテーブルに追加されたインジェスト済みファイルに関するレポートをフェッチします。大きなファイルの場合、レポートはファイルの一部のみの場合があります。このエンドポイントは、2つの時点間の履歴を表示するという点で insertReport
と異なります。最大10,000個のアイテムが返されますが、複数の呼び出しをパブリッシュして、目的の時間範囲をカバーできます。
重要
このエンドポイントは、過剰な呼び出しを避けるためにレート制限されています。レート制限(エラーコード429)を超えないようにするため、 loadHistoryScan
よりも insertReport
に強く依存することをお勧めします。 loadHistoryScan
を呼び出すとき、データロードのセットを含む最も狭い時間範囲を指定します。たとえば、履歴の最後の10分を8分ごとに読むとうまくいきます。毎分過去24時間の履歴を読み取ろうとすると、レート制限に達したことを示す429エラーが発生します。レート制限は、各履歴記録を数回読み取ることができるように設計されています。
これらの制限のない、より包括的なビューのために、Snowflakeは、パイプまたはテーブルのロード履歴を返すInformation Schemaテーブル関数 COPY_HISTORY を提供します。
- method
GET
- get url
https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<開始時刻>&endTimeExclusive=<終了時刻>&requestId=<リクエストID>
- header fields
承諾: テキスト/プレーン、またはアプリケーション/JSON
認証: BEARER <JWTトークン>
- get body
次の属性を持つ JSON オブジェクト。
属性
必須
説明
account
はい
Snowflakeアカウントの アカウント識別子。
pipeName
はい
大文字と小文字が区別される完全修飾パイプ名。例:
myDatabase.mySchema.myPipe
。startTimeInclusive
はい
ISO-8601形式のタイムスタンプ。ロード履歴データを取得する時間範囲の開始。
endTimeExclusive
いいえ
ISO-8601形式のタイムスタンプ。ロード履歴データを取得する時間範囲の終了。省略した場合、 CURRENT_TIMESTAMP()が範囲の終わりとして使用されます。
requestId
いいえ
システムを通じてリクエストを追跡するために使用される文字列。各リクエストでランダムな文字列を提供することをお勧めします(例:UUID)。
- response body
応答コード:
200 --- 成功。ロード履歴のスキャン結果が返されます。
400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。
404 --- 失敗。
pipeName
は認識されません。429 --- 失敗。リクエストのレート制限を超えました。
500 --- 失敗。内部エラーが発生しました。
応答ペイロード:
成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。
例:
{ "pipe": "TESTDB.TESTSCHEMA.pipe2", "completeResult": true, "startTimeInclusive": "2017-08-25T18:42:31.081Z", "endTimeExclusive":"2017-08-25T22:43:45.552Z", "rangeStartTime":"2017-08-25T22:43:45.383Z", "rangeEndTime":"2017-08-25T22:43:45.383Z", "files": [ { "path": "data2859002086815673867.csv", "stageLocation": "s3://mystage/", "fileSize": 57, "timeReceived": "2017-08-25T22:43:45.383Z", "lastInsertTime": "2017-08-25T22:43:45.383Z", "rowsInserted": 1, "rowsParsed": 1, "errorsSeen": 0, "errorLimit": 1, "complete": true, "status": "LOADED" } ] }
応答フィールド:
フィールド
型
説明
pipe
文字列
パイプの完全修飾名。
completeResult
ブール値
レポートが不完全な場合(つまり、指定された時間範囲内のエントリの数が10,000エントリの制限を超える場合)は、
false
。false
の場合、ユーザーは、エントリの次のセットに進むための次のリクエストに対するstartTimeInclusive
値として、現在のrangeEndTime
値を指定できます。startTimeInclusive
文字列
リクエストで提供される開始タイムスタンプ( ISO-8601形式)。
endTimeExclusive
文字列
リクエストで提供される終了タイムスタンプ( ISO-8601形式)。
rangeStartTime
文字列
応答に含まれるファイルの最も古いエントリのタイムスタンプ( ISO-8601形式)。
rangeEndTime
文字列
応答に含まれるファイルの最新エントリのタイムスタンプ( ISO-8601形式)。
files
配列
JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。配列内の応答フィールドは、
insertReport
応答で返されるものと同じです。