Pythonでのデータのロードとアンロードのリソースの管理

Pythonを使用して、外部ボリューム、パイプ、ステージなど、Snowflakeのデータロードおよびアンロードリソースを管理できます。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root
from snowflake.snowpark import Session

session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

ステージの管理

クラウドストレージ内のデータファイルの場所であるSnowflakeステージを管理できます。ステージの概要については、 データのロードの概要 をご参照ください。

Snowflake Python APIs は、ステージを2つの別々のタイプで表します。

  • Stage: ステージ名、暗号化タイプ、認証情報、ディレクトリテーブル設定などのステージのプロパティを公開します。

  • StageResource: 対応する Stage オブジェクトの取得、ステージ上のファイルのアップロードと一覧表示、ステージのドロップに使用できるメソッドを公開します。

ステージの作成

ステージを作成するには、まず Stage オブジェクトを作成して、 API Root オブジェクトから StageCollection オブジェクトを作成します。 StageCollection.create を使って、新しいステージをSnowflakeに追加します。

以下の例のコードは、 SNOWFLAKE_SSE (サーバー側の暗号化のみ)の暗号化タイプで、 my_stage という名前のステージを表す Stage オブジェクトを作成します。

from snowflake.core.stage import Stage, StageEncryption

my_stage = Stage(
  name="my_stage",
  encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
Copy

コードは、 StageCollection 変数 stages を作成し、 StageCollection.create を使用してSnowflakeに新しいステージを作成します。

ステージ詳細の取得

Stage オブジェクトを返す StageResource.fetch メソッドを呼び出すことで、ステージに関する情報を取得できます。

次の例のコードは、 my_stage という名前のステージの情報を取得します。

my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
Copy

ステージの一覧表示

Stage オブジェクトの PagedIter 反復子を返す StageCollection.iter メソッドを使用して、ステージを一覧表示することができます。

次の例のコードは、名前に my というテキストを含むステージを一覧表示し、以下のそれぞれの名前を印刷します。

from snowflake.core.stage import StageCollection

stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%")  # returns a PagedIter[Stage]
for stage_obj in stage_iter:
  print(stage_obj.name)
Copy

ステージ操作の実行

ステージへのファイルのアップロードやステージ上のファイルの一覧表示など、一般的なステージ操作は StageResource オブジェクトで実行できます。

ステージ・リソースでできるいくつかの操作を示すために、次の例のコードでは次のようにしています。

  1. 指定された自動圧縮と上書きオプションで、 my-file.yaml という名前のファイルを my_stage ステージにアップロードします。

  2. ステージ上のすべてのファイルを一覧表示し、ファイルが正常にアップロードされたことを確認します。

  3. ステージをドロップします。

my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]

my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)

stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
  print(stageFile)

my_stage_res.drop()
Copy

パイプの管理

Snowflake パイプを管理することができます。Snowflake パイプは、Snowpipe がインジェスションキューからテーブルにデータをロードするために使用する COPY INTO ステートメントを含む、名前付きのファーストクラスの Snowflake オブジェクトです。パイプの概要については、 Snowpipe をご参照ください。

Snowflake Python APIs は、パイプを2つの別々のタイプで表します。

  • Pipe: Snowpipeが使用するパイプ名や COPY INTO ステートメントなどのパイプのプロパティを公開します。

  • PipeResource: 対応する Pipe オブジェクトを取得し、ステージングされたデータファイルでパイプをリフレッシュし、パイプをドロップするために使用できるメソッドを公開します。

パイプの作成

ウェアハウスを作成するには、まず Pipe オブジェクトを作成して、 API Root オブジェクトから PipeCollection オブジェクトを作成します。 PipeCollection.create を使って、新しいパイプをSnowflakeに追加します。

以下の例のコードは、指定された COPY INTO ステートメントを持つ my_pipe という名前のパイプを表す Pipe オブジェクトを作成します。

from snowflake.core.pipe import Pipe

my_pipe = Pipe(
  name="my_pipe",
  comment="creating my pipe",
  copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)

pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
Copy

コードは、 PipeCollection 変数 pipes を作成し、 PipeCollection.create を使用してSnowflakeに新しいパイプを作成します。

パイプの詳細の取得

Pipe オブジェクトを返す PipeResource.fetch メソッドを呼び出すことで、パイプに関する情報を取得できます。

次の例のコードは、 my_pipe という名前のパイプの情報を取得します。

my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
Copy

パイプの一覧表示

Pipe オブジェクトの PagedIter 反復子を返す PipeCollection.iter メソッドを使用して、パイプを一覧表示することができます。

次の例のコードは、名前が my で始まるパイプを一覧表示し、それぞれの名前を印刷します。

from snowflake.core.pipe import PipeCollection

pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%")  # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
  print(pipe_obj.name)
Copy

パイプ操作の実行

PipeResource により、パイプのリフレッシュやドロップなど、一般的なパイプ操作を行うことができます。

注釈

現在のところ、 ALTER PIPE の REFRESH 機能のみがサポートされています。

パイプ・リソースでできる操作を示すために、次の例のコードでは次のようにしています。

  1. my_pipe パイプ・リソース・オブジェクトを取得します。

  2. 指定したオプションの接頭辞(またはパス)を持つステージングされたデータファイルでパイプをリフレッシュします。

  3. パイプをドロップします。

my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]

# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")

my_pipe_res.drop()
Copy

外部ボリュームの管理

外部ボリュームは名前付きのアカウントレベルのSnowflakeオブジェクトで、Snowflakeを Apache Iceberg™ テーブル用の外部クラウドストレージに接続するために使用します。詳細については、 Apache Iceberg™ テーブル外部ボリューム セクションをご参照ください。

Snowflake Python APIs は外部ボリュームを2つの別々のタイプで表します。

  • ExternalVolume: 外部ボリュームの名前や保存場所などのプロパティを公開します。

  • ExternalVolumeResource: 対応する ExternalVolume オブジェクトの取得、外部ボリュームのドロップとドロップ解除に使用できるメソッドを公開します。

外部ボリュームの作成

外部ボリュームを作成するには、まず ExternalVolume オブジェクトを作成し、次に API Root オブジェクトから ExternalVolumeCollection オブジェクトを作成します。 ExternalVolumeCollection.create を使用して、新しい外部ボリュームをSnowflakeに追加します。

次の例のコードは、指定された AWS S3ストレージロケーションを持つ、 my_external_volume という外部ボリュームを表す ExternalVolume オブジェクトを作成します。

from snowflake.core.external_volume import (
    ExternalVolume,
    StorageLocationS3,
)

my_external_volume = ExternalVolume(
    name="my_external_volume",
    storage_locations=[
        StorageLocationS3(
            name="my-s3-us-west-1",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
        StorageLocationS3(
            name="my-s3-us-west-2",
            storage_base_url="s3://MY_EXAMPLE_BUCKET/",
            storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
            encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
        ),
    ]
)

root.external_volumes.create(my_external_volume)
Copy

外部ボリュームの詳細の取得

ExternalVolume オブジェクトを返す ExternalVolumeResource.fetch メソッドを呼び出すことで、外部ボリュームに関する情報を取得できます。

次の例のコードは、 my_external_volume という名前の外部ボリュームに関する情報を取得します。

my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
Copy

外部ボリュームの一覧表示

ExternalVolume オブジェクトの PagedIter 反復子を返す ExternalVolumeCollection.iter メソッドを使用して、外部ボリュームを一覧表示することができます。

以下の例のコードは、名前が my で始まる外部ボリュームを一覧表示し、それぞれの名前を印刷します。

external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
  print(external_volume_obj.name)
Copy

外部ボリューム操作の実行

外部ボリュームのドロップやアンドロップなど、一般的な外部ボリューム操作は ExternalVolumeResource オブジェクトで実行できます。

外部ボリューム・リソースでできる操作を示すために、次の例のコードでは次のようにしています。

  1. my_external_volume 外部ボリューム・リソース・オブジェクトを取得します。

  2. 外部ボリュームをドロップします。

  3. 外部ボリュームをアンドロップします。

my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()
Copy