Pythonでのデータのロードとアンロードのリソースの管理¶
Pythonを使用して、外部ボリューム、パイプ、ステージなど、Snowflakeのデータロードおよびアンロードリソースを管理できます。
前提条件¶
このトピックの例では、Snowflakeと接続するコードを追加して Root
オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。
たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。
from snowflake.core import Root
from snowflake.snowpark import Session
session = Session.builder.config("connection_name", "myconnection").create()
root = Root(session)
出来上がった Session
オブジェクトを使って、コードは API のタイプとメソッドを使う Root
オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。
ステージの管理¶
クラウドストレージ内のデータファイルの場所であるSnowflakeステージを管理できます。ステージの概要については、 データのロードの概要 をご参照ください。
Snowflake Python APIs は、ステージを2つの別々のタイプで表します。
Stage
: ステージ名、暗号化タイプ、認証情報、ディレクトリテーブル設定などのステージのプロパティを公開します。StageResource
: 対応するStage
オブジェクトの取得、ステージ上のファイルのアップロードと一覧表示、ステージのドロップに使用できるメソッドを公開します。
ステージの作成¶
ステージを作成するには、まず Stage
オブジェクトを作成して、 API Root
オブジェクトから StageCollection
オブジェクトを作成します。 StageCollection.create
を使って、新しいステージをSnowflakeに追加します。
以下の例のコードは、 SNOWFLAKE_SSE
(サーバー側の暗号化のみ)の暗号化タイプで、 my_stage
という名前のステージを表す Stage
オブジェクトを作成します。
from snowflake.core.stage import Stage, StageEncryption
my_stage = Stage(
name="my_stage",
encryption=StageEncryption(type="SNOWFLAKE_SSE")
)
stages = root.databases["my_db"].schemas["my_schema"].stages
stages.create(my_stage)
コードは、 StageCollection
変数 stages
を作成し、 StageCollection.create
を使用してSnowflakeに新しいステージを作成します。
ステージ詳細の取得¶
Stage
オブジェクトを返す StageResource.fetch
メソッドを呼び出すことで、ステージに関する情報を取得できます。
次の例のコードは、 my_stage
という名前のステージの情報を取得します。
my_stage = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].fetch()
print(my_stage.to_dict())
ステージの一覧表示¶
Stage
オブジェクトの PagedIter
反復子を返す StageCollection.iter
メソッドを使用して、ステージを一覧表示することができます。
次の例のコードは、名前に my
というテキストを含むステージを一覧表示し、以下のそれぞれの名前を印刷します。
from snowflake.core.stage import StageCollection
stages: StageCollection = root.databases["my_db"].schemas["my_schema"].stages
stage_iter = stages.iter(like="my%") # returns a PagedIter[Stage]
for stage_obj in stage_iter:
print(stage_obj.name)
ステージ操作の実行¶
ステージへのファイルのアップロードやステージ上のファイルの一覧表示など、一般的なステージ操作は StageResource
オブジェクトで実行できます。
ステージ・リソースでできるいくつかの操作を示すために、次の例のコードでは次のようにしています。
指定された自動圧縮と上書きオプションで、
my-file.yaml
という名前のファイルをmy_stage
ステージにアップロードします。ステージ上のすべてのファイルを一覧表示し、ファイルが正常にアップロードされたことを確認します。
ステージをドロップします。
my_stage_res = root.databases["my_db"].schemas["my_schema"].stages["my_stage"]
my_stage_res.put("./my-file.yaml", "/", auto_compress=False, overwrite=True)
stageFiles = root.databases["my_db"].schemas["my_schema"].stages["my_stage"].list_files()
for stageFile in stageFiles:
print(stageFile)
my_stage_res.drop()
パイプの管理¶
Snowflake パイプを管理することができます。Snowflake パイプは、Snowpipe がインジェスションキューからテーブルにデータをロードするために使用する COPY INTO ステートメントを含む、名前付きのファーストクラスの Snowflake オブジェクトです。パイプの概要については、 Snowpipe をご参照ください。
Snowflake Python APIs は、パイプを2つの別々のタイプで表します。
Pipe
: Snowpipeが使用するパイプ名や COPY INTO ステートメントなどのパイプのプロパティを公開します。PipeResource
: 対応するPipe
オブジェクトを取得し、ステージングされたデータファイルでパイプをリフレッシュし、パイプをドロップするために使用できるメソッドを公開します。
パイプの作成¶
ウェアハウスを作成するには、まず Pipe
オブジェクトを作成して、 API Root
オブジェクトから PipeCollection
オブジェクトを作成します。 PipeCollection.create
を使って、新しいパイプをSnowflakeに追加します。
以下の例のコードは、指定された COPY INTO ステートメントを持つ my_pipe
という名前のパイプを表す Pipe
オブジェクトを作成します。
from snowflake.core.pipe import Pipe
my_pipe = Pipe(
name="my_pipe",
comment="creating my pipe",
copy_statement="COPY INTO my_table FROM @mystage FILE_FORMAT = (TYPE = 'JSON')",
)
pipes = root.databases["my_db"].schemas["my_schema"].pipes
pipes.create(my_pipe)
コードは、 PipeCollection
変数 pipes
を作成し、 PipeCollection.create
を使用してSnowflakeに新しいパイプを作成します。
パイプの詳細の取得¶
Pipe
オブジェクトを返す PipeResource.fetch
メソッドを呼び出すことで、パイプに関する情報を取得できます。
次の例のコードは、 my_pipe
という名前のパイプの情報を取得します。
my_pipe = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"].fetch()
print(my_pipe.to_dict())
パイプの一覧表示¶
Pipe
オブジェクトの PagedIter
反復子を返す PipeCollection.iter
メソッドを使用して、パイプを一覧表示することができます。
次の例のコードは、名前が my
で始まるパイプを一覧表示し、それぞれの名前を印刷します。
from snowflake.core.pipe import PipeCollection
pipes: PipeCollection = root.databases["my_db"].schemas["my_schema"].pipes
pipe_iter = pipes.iter(like="my%") # returns a PagedIter[Pipe]
for pipe_obj in pipe_iter:
print(pipe_obj.name)
パイプ操作の実行¶
PipeResource
により、パイプのリフレッシュやドロップなど、一般的なパイプ操作を行うことができます。
注釈
現在のところ、 ALTER PIPE の REFRESH 機能のみがサポートされています。
パイプ・リソースでできる操作を示すために、次の例のコードでは次のようにしています。
my_pipe
パイプ・リソース・オブジェクトを取得します。指定したオプションの接頭辞(またはパス)を持つステージングされたデータファイルでパイプをリフレッシュします。
パイプをドロップします。
my_pipe_res = root.databases["my_db"].schemas["my_schema"].pipes["my_pipe"]
# equivalent to: ALTER PIPE my_pipe REFRESH PREFIX = 'dir3/'
my_pipe_res.refresh(prefix="dir3/")
my_pipe_res.drop()
外部ボリュームの管理¶
外部ボリュームは名前付きのアカウントレベルのSnowflakeオブジェクトで、Snowflakeを Apache Iceberg™ テーブル用の外部クラウドストレージに接続するために使用します。詳細については、 Apache Iceberg™ テーブル の 外部ボリューム セクションをご参照ください。
Snowflake Python APIs は外部ボリュームを2つの別々のタイプで表します。
ExternalVolume
: 外部ボリュームの名前や保存場所などのプロパティを公開します。ExternalVolumeResource
: 対応するExternalVolume
オブジェクトの取得、外部ボリュームのドロップとドロップ解除に使用できるメソッドを公開します。
外部ボリュームの作成¶
外部ボリュームを作成するには、まず ExternalVolume
オブジェクトを作成し、次に API Root
オブジェクトから ExternalVolumeCollection
オブジェクトを作成します。 ExternalVolumeCollection.create
を使用して、新しい外部ボリュームをSnowflakeに追加します。
次の例のコードは、指定された AWS S3ストレージロケーションを持つ、 my_external_volume
という外部ボリュームを表す ExternalVolume
オブジェクトを作成します。
from snowflake.core.external_volume import (
ExternalVolume,
StorageLocationS3,
)
my_external_volume = ExternalVolume(
name="my_external_volume",
storage_locations=[
StorageLocationS3(
name="my-s3-us-west-1",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
StorageLocationS3(
name="my-s3-us-west-2",
storage_base_url="s3://MY_EXAMPLE_BUCKET/",
storage_aws_role_arn="arn:aws:iam::123456789012:role/myrole",
encryption=Encryption(type="AWS_SSE_KMS", kms_key_id="1234abcd-12ab-34cd-56ef-1234567890ab"),
),
]
)
root.external_volumes.create(my_external_volume)
外部ボリュームの詳細の取得¶
ExternalVolume
オブジェクトを返す ExternalVolumeResource.fetch
メソッドを呼び出すことで、外部ボリュームに関する情報を取得できます。
次の例のコードは、 my_external_volume
という名前の外部ボリュームに関する情報を取得します。
my_external_volume = root.external_volumes["my_external_volume"].fetch()
print(my_external_volume.to_dict())
外部ボリュームの一覧表示¶
ExternalVolume
オブジェクトの PagedIter
反復子を返す ExternalVolumeCollection.iter
メソッドを使用して、外部ボリュームを一覧表示することができます。
以下の例のコードは、名前が my
で始まる外部ボリュームを一覧表示し、それぞれの名前を印刷します。
external_volume_iter = root.external_volumes.iter(like="my%")
for external_volume_obj in external_volume_iter:
print(external_volume_obj.name)
外部ボリューム操作の実行¶
外部ボリュームのドロップやアンドロップなど、一般的な外部ボリューム操作は ExternalVolumeResource
オブジェクトで実行できます。
外部ボリューム・リソースでできる操作を示すために、次の例のコードでは次のようにしています。
my_external_volume
外部ボリューム・リソース・オブジェクトを取得します。外部ボリュームをドロップします。
外部ボリュームをアンドロップします。
my_external_volume_res = root.external_volumes["my_external_volume"]
my_external_volume_res.drop()
my_external_volume_res.undrop()