Snowflake SQLAlchemy ツールキットでのPythonコネクタの使用¶
Snowflake SQLAlchemy は、Snowflakeデータベースと SQLAlchemy アプリケーションをつなぐ 方言 として、Python用Snowflakeコネクタの上で実行されます。
このトピックの内容:
前提条件¶
Python用Snowflakeコネクタ¶
Snowflake SQLAlchemy の唯一の要件は、Python用Snowflakeコネクタです。ただし、Snowflake SQLAlchemy をインストールするとコネクタが自動的にインストールされるため、コネクタをインストールする必要はありません。
データアナリティクスとウェブアプリケーションフレームワーク(オプション)¶
Snowflake SQLAlchemy は、 Pandas、 Jupyter および Pyramid で使用できます。これらは、データアナリティクスおよびウェブアプリケーション用により高次のアプリケーションフレームワークを提供します。ただし、作業環境をゼロから構築することは、特に初心者ユーザーにとっては簡単な作業ではありません。フレームワークをインストールするには、Cコンパイラとツールが必要です。適切なツールとバージョンの選択は、ユーザーがPythonアプリケーションを使用することを妨げる障害になりかねません。
環境を構築する簡単な方法は、 Anaconda を使用することです。これにより、データアナリストや学生などのPython以外の専門家を含むすべてのユーザーに完全なプリコンパイルテクノロジースタックが提供されます。Anacondaのインストール手順については、 Anacondaのインストールドキュメント をご参照ください。その後、 pip を使用して、Snowflake SQLAlchemy パッケージをAnacondaの上にインストールできます。
Snowflake SQLAlchemy のインストール¶
Snowflake SQLAlchemy パッケージは、 pip
を使用してパブリック PyPI リポジトリからインストールできます。
pip install --upgrade snowflake-sqlalchemy
pip
は、Python用Snowflakeコネクタを含むすべての必要なモジュールを自動的にインストールします。
開発者ノートは、ソースコード GitHub にホストされていることに注意してください。
インストールの確認¶
次のPythonサンプルコードを含むファイル(例:
validate.py
)を作成します。これは、Snowflakeに接続し、Snowflakeバージョンを表示します。#!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
<ユーザーログイン名>
、<パスワード>
、および<アカウント識別子>
をSnowflakeアカウントおよびユーザーの適切な値に置き換えます。詳細については、 接続パラメーター (このトピック内)をご参照ください。サンプルコードを実行します。例えば、
validate.py
という名前のファイルを作成した場合、python validate.py
Snowflakeバージョン(例: 1.48.0
)が表示されます。
Snowflake固有のパラメーターと動作¶
Snowflake SQLAlchemy は、可能な限り、 SQLAlchemy アプリケーションに互換性のある機能を提供します。 SQLAlchemyの使用については、 SQLAlchemy のドキュメント をご参照ください。
ただし、Snowflake SQLAlchemy はSnowflake固有のパラメーターと動作も提供します。これについては、次のセクションで説明します。
接続パラメーター¶
必須パラメーター¶
Snowflake SQLAlchemy は、次の接続文字列構文を使用してSnowflakeに接続し、セッションを開始します:
'snowflake://<user_login_name>:<password>@<account_identifier>'
条件:
<ユーザーログイン名>
は、Snowflakeユーザーのログイン名。<パスワード>
は、Snowflakeユーザーのパスワード。<アカウント識別子>
は、使用するアカウント識別子です。 アカウント識別子 をご参照ください。注釈
使用するアカウント識別子の一部に、
snowflakecomputing.com
ドメイン名を 含めない でください。Snowflakeは、使用するアカウント識別子にドメイン名を自動的に追加して、必要な接続を作成します。
追加の接続パラメーター¶
オプションで、接続文字列の最後( <アカウント名>
の後)に、次の追加情報を含めることができます。
'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
条件:
<データベース名>
および<スキーマ名>
は、スラッシュ(/
)で区切られたSnowflakeセッションの初期データベースとスキーマです。warehouse=<ウェアハウス名>
およびrole=<ロール名>'
はセッションの初期ウェアハウスおよびロールであり、パラメーター文字列として指定され、疑問符(?
)で区切られています。
注釈
ログイン後、接続文字列で指定された初期データベース、スキーマ、ウェアハウス、およびロールは、セッションに対していつでも変更できます。
プロキシサーバーの構成¶
プロキシサーバーのパラメーターはサポートされていません。代わりに、サポートされている環境変数を使用してプロキシサーバーを構成します。詳細については、 プロキシサーバーの使用 をご参照ください。
接続文字列の例¶
次の例では、ユーザー名 testuser1
、パスワード 0123456
、アカウント識別子 myorganization-myaccount
、データベース testdb
、スキーマ public
、ウェアハウス testwh
、およびロール myrole
で create_engine
メソッドを呼び出します。
from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' )
便宜上、 snowflake.sqlalchemy.URL
メソッドを使用して接続文字列を作成し、データベースに接続できます。次の例では、前の例と同じ接続文字列を作成します。
from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', ))
接続の開始および終了¶
engine.connect()
を実行して接続を開始します。 engine.execute()
の使用は避けてください。
# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose()
注釈
engine.dispose()
の前に connection.close()
を実行して、接続を終了してください。そうしない場合は、Python GarbageコレクターがSnowflakeとの通信に必要なリソースを削除するため、Pythonコネクタはセッションを適切に終了できなくなります。
明示的なトランザクションを使用する場合は、 SQLAlchemy 内の AUTOCOMMIT 実行オプション を無効にする必要があります。
デフォルトでは、 SQLAlchemy はこのオプションを有効にします。このオプションを有効にすると、 INSERT、 UPDATE、および DELETE ステートメントは、明示的なトランザクション内で実行された場合でも、実行時に自動的にコミットされます。
AUTOCOMMIT を無効にするには、 autocommit=False
を Connection.execution_options()
メソッドに渡します。例:
# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:
try:
connection.execute("BEGIN")
connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
finally:
connection.close()
engine.dispose()
自動インクリメントの動作¶
値を自動インクリメントするには、 Sequence
オブジェクトが必要です。新しいレコードが挿入されるたびに値を自動的にインクリメントするには、主キー列に Sequence
オブジェクトを含めます。例:
t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... )
オブジェクト名の大文字小文字の処理¶
Snowflakeは、大文字と小文字を区別しないすべてのオブジェクト名を大文字で保存します。対照的に、 SQLAlchemy はすべての小文字のオブジェクト名が大文字と小文字を区別しないと見なします。Snowflake SQLAlchemy は、スキーマレベルの通信中(つまり、テーブルとインデックスのリフレクション中)に、オブジェクト名の大文字と小文字を変換します。大文字のオブジェクト名を使用する場合、 SQLAlchemy は大文字と小文字が区別されると想定し、名前を引用符で囲みます。この動作により、Snowflakeから受信したデータディクショナリデータとの不一致が発生するため、引用符(例: "TestDb"
)を使用して大文字と小文字を区別する識別子名を作成しない限り、SQLAlchemy 側では、すべて小文字の名前を使用する必要があります。
インデックスのサポート¶
Snowflakeはインデックスを使用しないため、Snowflake SQLAlchemyも使用しません。
Numpyデータ型のサポート¶
Snowflake SQLAlchemy は、 NumPy
データ型のバインドとフェッチをサポートしています。バインディングは常にサポートされています。 NumPy
データ型のフェッチを有効にするには、接続パラメーターに numpy=True
を追加します。
次の NumPy
データ型がサポートされています。
numpy.int64
numpy.float64
numpy.datetime64
次の例は、 numpy.datetime64
データのラウンドトリップを示しています。
import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') connection = engine.connect() connection.execute( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.execute( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", engine) assert df.c1.values[0] == specific_date
列メタデータのキャッシュ¶
SQLAlchemy は ランタイム検査 API を提供して、さまざまなオブジェクトに関するランタイム情報を取得します。一般的な使用例の1つは、スキーマカタログを構築するために、スキーマ内のすべてのテーブルとその列メタデータを取得することです。例えば、 SQLAlchemy の上にある alembic は、データベーススキーマの移行を管理します。擬似コードフローは次のとおりです:
inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ...
このフローでは、潜在的な問題として、各テーブルでクエリを実行するのにかなり時間がかかることがあります。結果はキャッシュされますが、列のメタデータの取得には高額な費用がかかります。
この問題を軽減するために、Snowflake SQLAlchemy はフラグ cache_column_metadata=True
を取り、 get_table_names
が呼び出されたときにすべてのテーブルのすべての列メタデータがキャッシュされ、残りの get_columns
、 get_primary_keys
および get_foreign_keys
がキャッシュを利用できるようにします。
engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, ))
注釈
すべての列メタデータが Inspector
オブジェクトに関連付けられてキャッシュされるため、メモリ使用量が増加します。すべての列メタデータを取得する必要がある場合にのみ、フラグを使用します。
VARIANT 、ARRAY、および OBJECT のサポート¶
Snowflake SQLAlchemy は、 VARIANT
、 ARRAY
、および OBJECT
データ型の取得をサポートしています。すべての型はPythonで str
に変換されるため、 json.loads
を使用してネイティブデータ型に変換できます。
この例は、 VARIANT
、 ARRAY
、および OBJECT
データ型の列を含むテーブルを作成する方法を示しています。
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metdata.create_all(engine)
VARIANT
、 ARRAY
、および OBJECT
データ型の列を取得してネイティブPythonデータ型に変換するには、次のようにデータを取得して json.loads
メソッドを呼び出します。
import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2])
CLUSTER BY のサポート¶
Snowflake SQLAlchemy は、テーブルの CLUSTER BY
パラメーターをサポートしています。パラメーターの詳細については、 CREATE TABLE をご参照ください。
この例では、クラスタリングキーとして id
と name
の2つの列を持つテーブルを作成する方法を示します。
t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine)
Alembicのサポート¶
Alembic は、 SQLAlchemy
の上にあるデータベース移行ツールです。Snowflake SQLAlchemy は、AlembicがSnowflake SQLAlchemyを認識できるように、次のコードを alembic/env.py
に追加することで機能します。
from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake'
一般的な使用法については、 Alembicドキュメント をご参照ください。
キーペア認証のサポート¶
Snowflake SQLAlchemy は、Python用Snowflakeコネクタの機能を利用してキーペア認証をサポートします。秘密キーと公開キーを作成するステップについては、 キーペア認証とキーペアローテーションの使用 をご参照ください。
秘密キーパラメーターは、次のように connect_args
を介して渡されます。
... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, )
PRIVATE_KEY_PASSPHRASE
は、秘密キーファイル rsa_key.p8
を復号化するためのパスフレーズです。
snowflake.sqlalchemy.URL
メソッドは秘密キーパラメーターをサポートしていません。
マージコマンドのサポート¶
Snowflake SQLAlchemy は、その MergeInto
カスタム式によるアップサートの実行をサポートしています。包括的なドキュメントについては、 MERGE をご参照ください。
次のように使用します。
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge)
CopyIntoStorage のサポート¶
Snowflake SQLAlchemy は、カスタム CopyIntoStorage
式を使用して、テーブルとクエリ結果をさまざまなSnowflakeステージ、Azureコンテナー、および AWS バケットに保存することをサポートしています。包括的なドキュメントについては、 COPY INTO <場所> をご参照ください。
次のように使用します。
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into)