Pythonでのデータのロードとアンロードのリソースの管理¶
Pythonを使用して、外部ボリューム、パイプ、ステージなど、Snowflakeのデータロードおよびアンロードリソースを管理できます。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
ステージの管理¶
クラウドストレージ内のデータファイルの場所であるSnowflakeステージを管理できます。ステージの概要については、 データのロードの概要 をご参照ください。
Snowflake Python APIs は、ステージを2つの別々のタイプで表します。
Stage: ステージ名、暗号化タイプ、認証情報、ディレクトリテーブル設定などのステージのプロパティを公開します。StageResource: 対応するStageオブジェクトの取得、ステージ上のファイルのアップロードと一覧表示、ステージのドロップに使用できるメソッドを公開します。
ステージの作成¶
ステージを作成するには、まず Stage オブジェクトを作成して、 API Root オブジェクトから StageCollection オブジェクトを作成します。 StageCollection.create を使って、新しいステージをSnowflakeに追加します。
以下の例のコードは、 SNOWFLAKE_SSE (サーバー側の暗号化のみ)の暗号化タイプで、 my_stage という名前のステージを表す Stage オブジェクトを作成します。
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
コードは、 StageCollection 変数 stages を作成し、 StageCollection.create を使用してSnowflakeに新しいステージを作成します。
ステージ詳細の取得¶
Stage オブジェクトを返す StageResource.fetch メソッドを呼び出すことで、ステージに関する情報を取得できます。
次の例のコードは、 my_stage という名前のステージの情報を取得します。
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
ステージの一覧表示¶
Stage オブジェクトの PagedIter 反復子を返す StageCollection.iter メソッドを使用して、ステージを一覧表示することができます。
次の例のコードは、名前に my というテキストを含むステージを一覧表示し、以下のそれぞれの名前を印刷します。
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
ステージ操作の実行¶
ステージへのファイルのアップロードやステージ上のファイルの一覧表示など、一般的なステージ操作は StageResource オブジェクトで実行できます。
ステージ・リソースでできるいくつかの操作を示すために、次の例のコードでは次のようにしています。
指定された自動圧縮と上書きオプションで、
my-file.yamlという名前のファイルをmy_stageステージにアップロードします。ステージ上のすべてのファイルを一覧表示し、ファイルが正常にアップロードされたことを確認します。
ステージをドロップします。
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
パイプの管理¶
Snowflake パイプを管理することができます。Snowflake パイプは、Snowpipe がインジェスションキューからテーブルにデータをロードするために使用する COPY INTO ステートメントを含む、名前付きのファーストクラスの Snowflake オブジェクトです。パイプの概要については、 Snowpipe をご参照ください。
Snowflake Python APIs は、パイプを2つの別々のタイプで表します。
Pipe: Snowpipeが使用するパイプ名や COPY INTO ステートメントなどのパイプのプロパティを公開します。PipeResource: 対応するPipeオブジェクトを取得し、ステージングされたデータファイルでパイプをリフレッシュし、パイプをドロップするために使用できるメソッドを公開します。
パイプの作成¶
ウェアハウスを作成するには、まず Pipe オブジェクトを作成して、 API Root オブジェクトから PipeCollection オブジェクトを作成します。 PipeCollection.create を使って、新しいパイプをSnowflakeに追加します。
以下の例のコードは、指定された COPY INTO ステートメントを持つ my_pipe という名前のパイプを表す Pipe オブジェクトを作成します。
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
コードは、 PipeCollection 変数 pipes を作成し、 PipeCollection.create を使用してSnowflakeに新しいパイプを作成します。
パイプの詳細の取得¶
Pipe オブジェクトを返す PipeResource.fetch メソッドを呼び出すことで、パイプに関する情報を取得できます。
次の例のコードは、 my_pipe という名前のパイプの情報を取得します。
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
パイプの一覧表示¶
Pipe オブジェクトの PagedIter 反復子を返す PipeCollection.iter メソッドを使用して、パイプを一覧表示することができます。
次の例のコードは、名前が my で始まるパイプを一覧表示し、それぞれの名前を印刷します。
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
パイプ操作の実行¶
PipeResource により、パイプのリフレッシュやドロップなど、一般的なパイプ操作を行うことができます。
注釈
現在のところ、 ALTER PIPE の REFRESH 機能のみがサポートされています。
パイプ・リソースでできる操作を示すために、次の例のコードでは次のようにしています。
my_pipeパイプ リソース オブジェクトを取得します。指定したオプションの接頭辞(またはパス)を持つステージングされたデータファイルでパイプをリフレッシュします。
パイプをドロップします。
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
外部ボリュームの管理¶
外部ボリュームは名前付きのアカウントレベルのSnowflakeオブジェクトで、Snowflakeを Apache Iceberg™ テーブル用の外部クラウドストレージに接続するために使用します。詳細については、 Apache Iceberg™ テーブル の 外部ボリューム セクションをご参照ください。
Snowflake Python APIs は外部ボリュームを2つの別々のタイプで表します。
ExternalVolume: 外部ボリュームのプロパティ、例えばその名前や保存場所を公開します。ExternalVolumeResource: 対応するExternalVolumeオブジェクトを取得し、外部ボリュームをドロップまたは復元するために使用できるメソッドを公開します。
外部ボリュームの作成¶
外部ボリュームを作成するには、まず ExternalVolume オブジェクトを作成し、次に API Root オブジェクトから ExternalVolumeCollection オブジェクトを作成します。 ExternalVolumeCollection.create を使用して、新しい外部ボリュームをSnowflakeに追加します。
次の例のコードは、指定された AWS S3ストレージロケーションを持つ、 my_external_volume という外部ボリュームを表す ExternalVolume オブジェクトを作成します。
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
外部ボリュームの詳細の取得¶
ExternalVolume オブジェクトを返す ExternalVolumeResource.fetch メソッドを呼び出すことで、外部ボリュームに関する情報を取得できます。
次の例のコードは、 my_external_volume という名前の外部ボリュームに関する情報を取得します。
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
外部ボリュームの一覧表示¶
ExternalVolume オブジェクトの PagedIter 反復子を返す ExternalVolumeCollection.iter メソッドを使用して、外部ボリュームを一覧表示することができます。
以下の例のコードは、名前が my で始まる外部ボリュームを一覧表示し、それぞれの名前を印刷します。
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
外部ボリューム操作の実行¶
外部ボリュームのドロップや復元など、一般的な外部ボリューム操作は ExternalVolumeResource オブジェクトで実行できます。
外部ボリューム・リソースでできる操作を示すために、次の例のコードでは次のようにしています。
my_external_volume外部ボリューム・リソース・オブジェクトを取得します。外部ボリュームをドロップします。
ドロップされた外部ボリュームの最新バージョンを復元します。
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()