Snowpipe REST API

REST エンドポイントを呼び出すことにより、パイプと対話します。このトピックでは、インジェストするファイルのリストを定義し、ロード履歴のレポートを取得するためのSnowpipe REST API について説明します。

Snowflakeは、Snowpipe REST APIでの作業を簡素化するJavaおよびPython APIs も提供します。

このトピックの内容:

データファイルの取り込み

Snowpipe API は、取り込むファイルのリストを定義するための REST エンドポイントを提供します。

エンドポイント: insertFiles

テーブルにインジェストするファイルについてSnowflakeに通知します。このエンドポイントからの正常な応答は、Snowflakeがテーブルに追加するファイルのリストを記録したことを意味します。必ずしもファイルがインジェストされたことを意味するわけではありません。詳細については、以下の応答コードをご参照ください。

ほとんどの場合、Snowflakeは数分以内に新しいデータをターゲットテーブルに挿入します。

method

POST

post url

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

post body

次の属性を持つ JSON オブジェクト。

属性

必須

説明

account

はい

アカウントの名前(Snowflakeが提供)。

pipeName

はい

大文字と小文字が区別される完全修飾パイプ名。例: myDatabase.mySchema.myPipe

requestId

いいえ

システムを通じてリクエストを追跡するために使用される文字列。UUID などのリクエストごとに、ランダムな文字列を指定することをお勧めします。

content-type

text/plain

application/json

header fields

承諾: text/plain または application/json

認証: BEARER <JWTトークン>

  • text/plain の場合、コンテンツは1行に1つずつのパス名のリストです。

  • application/json の場合、ペイロードの例は次のとおりです。

    {
      "files":[
        {
          "path":"filePath/file1.csv"
        },
        {
          "path":"filePath/file2.csv"
        }
      ]
    }
    

論理的で詳細なパスを使用し、ステージでデータをパーティション分割することにより、推奨されるベストプラクティスに従う場合、ペイロードのパス値にはステージングされたファイルへの完全なパスが含まれます。

注釈

  • 投稿には最大5000個のファイルを含めることができます。

  • 指定された各ファイルパスは、 UTF-8としてシリアル化される場合、<= 1024バイトの長さである必要があります。

response body

応答コード:

  • 200 --- 成功。インジェストするファイルのキューにファイルが追加されます。

  • 400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。

  • 404 --- 失敗。 pipeName は認識されません。

  • 429 --- 失敗。リクエストのレート制限を超えました。

  • 500 --- 失敗。内部エラーが発生しました。

応答ペイロード:

API リクエストが成功すると(つまり、コード200)、応答ペイロードには requestId および status 要素が JSON 形式で含まれます。エラーが発生した場合は、応答ペイロードにエラーの詳細が含まれる場合があります。

パイプ定義の COPY INTO <テーブル> ステートメントに PATTERN コピーオプションが含まれている場合、 unmatchedPatternFiles 属性には、ヘッダー内で送信され、正規表現と一致 しない ためにスキップされたファイルが一覧表示されます。

ロード履歴レポート

Snowpipe API は、ロードレポートをフェッチするための REST エンドポイントを提供します。

エンドポイント: insertReport

内容が最近テーブルにインジェストされた insertFiles を介して送信されたファイルのレポートを取得します。大きなファイルの場合、レポートはファイルの一部のみの場合があります。

このエンドポイントには次の制限があります。

  • 10、000の最新のイベントが保持されます。

  • イベントは最大10分間保持されます。

insertFiles を介して送信されたファイルのデータがテーブルにコミットされ、クエリで使用できるようになると、イベントが発生します。 insertReport エンドポイントは UNIX コマンドテールのように考えることができます。このコマンドを繰り返し呼び出すことにより、時間の経過とともにパイプ上のイベントの完全な履歴を見ることができます。このコマンドは、イベントを見逃さないように頻繁に呼び出す必要があります。その頻度は、ファイルが insertFiles に送信されるレートにより異なります。

method

GET

get url

https://<アカウント>.snowflakecomputing.com/v1/data/pipes/<パイプ名>/insertReport?requestId=<リクエストID>&beginMark=<開始マーク>

header fields

承諾:テキスト/プレーン、またはアプリケーション/json

認証: BEARER <JWTトークン>

get body

次の属性を持つ JSON オブジェクト。

属性

必須

説明

account

はい

Snowflakeによってアカウントに割り当てられた名前。Snowflakeから受け取った URL では、アカウント名はドメインの最初のセグメントです(例: https://xy12345.snowflakecomputing.comxy12345)。

pipeName

はい

大文字と小文字が区別される完全修飾パイプ名。例: myDatabase.mySchema.myPipe

requestId

いいえ

システムを通じてリクエストを追跡するために使用される文字列。UUID などのリクエストごとに、ランダムな文字列を指定することをお勧めします。

beginMark

いいえ

insertReport に対する以前の呼び出しによって返されたマーカーは、 insertReport を繰り返し呼び出したときに表示される、繰り返されるイベントの数を減らすために使用できます。これはヒントであり、ときどき繰り返しイベントが返される場合があります。

response body

応答コード:

  • 200 --- 成功。レポートが返されました。

  • 400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。

  • 404 --- 失敗。 pipeName は認識されません。

  • 429 --- 失敗。リクエストのレート制限を超えました。

  • 500 --- 失敗。内部エラーが発生しました。

応答ペイロード:

成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。

例:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "nextBeginMark": "1_39",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mybucket/",
      "fileSize": 57,
      "timeReceived": "2017-06-21T04:47:41.453Z",
      "lastInsertTime": "2017-06-21T04:48:28.575Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}

応答フィールド:

フィールド

説明

pipe

文字列

パイプの完全修飾名。

completeResult

ブール値

提供された beginMark とこのレポート履歴にある最初のイベントの間にイベントが欠落した場合は、 false。それ以外の場合は、 true

nextBeginMark

文字列

重複レコードの表示を避けるために、 beginMark を次のリクエストで使用。この値はヒントです。それでも重複が発生する場合があります。

files

配列

JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。

path

文字列

ステージの場所に関連するファイルパス。

stageLocation

文字列

パイプで定義されたステージ ID (内部ステージ)またはS3バケット(外部ステージ)のいずれか。

fileSize

ロング

バイト単位のファイルサイズ。

timeReceived

文字列

処理のためにこのファイルが受信された時刻。形式は UTC タイムゾーンの ISO-8601。

lastInsertTime

文字列

このファイルのデータが最後にテーブルに挿入された時刻。形式は UTC タイムゾーンの ISO-8601。

rowsInserted

ロング

ファイルからターゲットテーブルに挿入された行の数。

rowsParsed

ロング

ファイルから解析された行の数。エラーのある行はスキップされる場合があります。

errorsSeen

整数

ファイルに見られるエラーの数

errorLimit

整数

失敗( ON_ERROR コピーオプションに基づく)と見なされるまでにファイルで許可されるエラーの数。

firstError [1]

文字列

このファイルで最初に発生したエラーのエラーメッセージ。

firstErrorLineNum [1]

ロング

最初のエラーの行番号。

firstErrorCharacterPos [1]

ロング

最初のエラーの文字位置。

firstErrorColumnName [1]

文字列

最初のエラーが発生した列名。

systemError [1]

文字列

ファイルが処理されなかった理由を説明する一般エラー。

complete

ブール値

ファイルが完全に正常に処理されたかどうかを示します。

status

文字列

ファイルのロードステータス。

  • LOAD_IN_PROGRESS :ファイルの一部がテーブルにロードされましたが、ロードプロセスはまだ完了していません。

  • LOADED :ファイル全体がテーブルにロードされました。

  • LOAD_FAILED :ファイルのロードに失敗しました。

  • PARTIALLY_LOADED :このファイルの一部の行は正常にロードされましたが、その他の行はエラーのためロードされませんでした。このファイルの処理は完了しました。

[1]値は、ファイルにエラーが含まれる場合、これらのフィールドのみに提供されます。

エンドポイント: loadHistoryScan

内容がテーブルに追加されたインジェスト済みファイルに関するレポートをフェッチします。大きなファイルの場合、レポートはファイルの一部のみの場合があります。このエンドポイントは、2つの時点間の履歴を表示するという点で insertReport と異なります。最大10,000個のアイテムが返されますが、複数の呼び出しをパブリッシュして、目的の時間範囲をカバーできます。

重要

このエンドポイントは、過剰な呼び出しを避けるためにレート制限されています。レート制限(エラーコード429)を超えないようにするため、 loadHistoryScan よりも insertReport に強く依存することをお勧めします。 loadHistoryScan を呼び出すとき、データロードのセットを含む最も狭い時間範囲を指定します。たとえば、履歴の最後の10分を8分ごとに読むとうまくいきます。毎分過去24時間の履歴を読み取ろうとすると、レート制限に達したことを示す429エラーが発生します。レート制限は、各履歴レコードを数回読み取ることができるように設計されています。

これらの制限のない、より包括的なビューのために、Snowflakeは、パイプまたはテーブルのロード履歴を返すInformation Schemaテーブル関数 COPY_HISTORY を提供します。

method

GET

get url

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/loadHistoryScan?startTimeInclusive=<開始時刻>&endTimeExclusive=<終了時刻>&requestId=<リクエストID>

header fields

承諾:テキスト/プレーン、またはアプリケーション/json

認証: BEARER <JWTトークン>

get body

次の属性を持つ JSON オブジェクト。

属性

必須

説明

account

はい

Snowflakeによってアカウントに割り当てられた名前。Snowflakeから受け取った URL では、アカウント名はドメインの最初のセグメントです(例: https://xy12345.snowflakecomputing.comxy12345)。

pipeName

はい

大文字と小文字が区別される完全修飾パイプ名。例: myDatabase.mySchema.myPipe

startTimeInclusive

はい

ISO-8601形式のタイムスタンプ。ロード履歴データを取得する時間範囲の開始。

endTimeExclusive

いいえ

ISO-8601形式のタイムスタンプ。ロード履歴データを取得する時間範囲の終了。省略した場合、 CURRENT_TIMESTAMP()が範囲の終わりとして使用されます。

requestId

いいえ

システムを通じてリクエストを追跡するために使用される文字列。各リクエストでランダムな文字列を提供することをお勧めします(例:UUID)。

response body

応答コード:

  • 200 --- 成功。ロード履歴のスキャン結果が返されます。

  • 400 --- 失敗。無効な形式、または制限を超えたため、リクエストは無効です。

  • 404 --- 失敗。 pipeName は認識されません。

  • 429 --- 失敗。リクエストのレート制限を超えました。

  • 500 --- 失敗。内部エラーが発生しました。

応答ペイロード:

成功の応答(200)には、最近テーブルに追加されたファイルに関する情報が含まれています。このレポートは、大きなファイルの一部のみを表す場合があることに注意します。

例:

{
  "pipe": "TESTDB.TESTSCHEMA.pipe2",
  "completeResult": true,
  "startTimeInclusive": "2017-08-25T18:42:31.081Z",
  "endTimeExclusive":"2017-08-25T22:43:45.552Z",
  "rangeStartTime":"2017-08-25T22:43:45.383Z",
  "rangeEndTime":"2017-08-25T22:43:45.383Z",
  "files": [
    {
      "path": "data2859002086815673867.csv",
      "stageLocation": "s3://mystage/",
      "fileSize": 57,
      "timeReceived": "2017-08-25T22:43:45.383Z",
      "lastInsertTime": "2017-08-25T22:43:45.383Z",
      "rowsInserted": 1,
      "rowsParsed": 1,
      "errorsSeen": 0,
      "errorLimit": 1,
      "complete": true,
      "status": "LOADED"
    }
  ]
}

応答フィールド:

フィールド

説明

pipe

文字列

パイプの完全修飾名。

completeResult

ブール値

レポートが不完全な場合(つまり、指定された時間範囲内のエントリの数が10,000エントリの制限を超える場合)は、 falsefalse の場合、ユーザーは、エントリの次のセットに進むための次のリクエストに対する startTimeInclusive 値として、現在の rangeEndTime 値を指定できます。

startTimeInclusive

文字列

リクエストで提供される開始タイムスタンプ( ISO-8601形式)。

endTimeExclusive

文字列

リクエストで提供される終了タイムスタンプ( ISO-8601形式)。

rangeStartTime

文字列

応答に含まれるファイルの最も古いエントリのタイムスタンプ( ISO-8601形式)。

rangeEndTime

文字列

応答に含まれるファイルの最新エントリのタイムスタンプ( ISO-8601形式)。

files

配列

JSON オブジェクトの配列。履歴応答の一部であるファイルごとに1つのオブジェクト。配列内の応答フィールドは、 insertReport 応答で返されるものと同じです。