Snowflake SQLAlchemy ツールキットでのPythonコネクタの使用

Snowflake SQLAlchemy は、Snowflakeデータベースと SQLAlchemy アプリケーションをつなぐ 方言 として、Python用Snowflakeコネクタの上で実行されます。

このトピックの内容:

前提条件

Python用Snowflakeコネクタ

Snowflake SQLAlchemy の唯一の要件は、Python用Snowflakeコネクタです。ただし、Snowflake SQLAlchemy をインストールするとコネクタが自動的にインストールされるため、コネクタをインストールする必要はありません。

データアナリティクスとウェブアプリケーションフレームワーク(オプション)

Snowflake SQLAlchemy は、 PandasJupyter および Pyramid で使用できます。これらは、データアナリティクスおよびウェブアプリケーション用により高次のアプリケーションフレームワークを提供します。ただし、作業環境をゼロから構築することは、特に初心者ユーザーにとっては簡単な作業ではありません。フレームワークをインストールするには、Cコンパイラとツールが必要です。適切なツールとバージョンの選択は、ユーザーがPythonアプリケーションを使用することを妨げる障害になりかねません。

環境を構築する簡単な方法は、 Anaconda を使用することです。これにより、データアナリストや学生などのPython以外の専門家を含むすべてのユーザーに完全なプリコンパイルテクノロジースタックが提供されます。Anacondaのインストール手順については、 Anacondaのインストールドキュメント をご参照ください。その後、 pip を使用して、Snowflake SQLAlchemy パッケージをAnacondaの上にインストールできます。

Snowflake SQLAlchemy のインストール

Snowflake SQLAlchemy パッケージは、 pip を使用してパブリック PyPI リポジトリからインストールできます。

pip install --upgrade snowflake-sqlalchemy
Copy

pip は、Python用Snowflakeコネクタを含むすべての必要なモジュールを自動的にインストールします。

開発者ノートは、ソースコード GitHub にホストされていることに注意してください。

インストールの確認

  1. 次のPythonサンプルコードを含むファイル(例: validate.py)を作成します。これは、Snowflakeに接続し、Snowflakeバージョンを表示します。

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. <ユーザーログイン名><パスワード>、および <アカウント識別子> をSnowflakeアカウントおよびユーザーの適切な値に置き換えます。詳細については、 接続パラメーター (このトピック内)をご参照ください。

  3. サンプルコードを実行します。例えば、 validate.py という名前のファイルを作成した場合、

    python validate.py
    
    Copy

Snowflakeバージョン(例: 1.48.0)が表示されます。

Snowflake固有のパラメーターと動作

Snowflake SQLAlchemy は、可能な限り、 SQLAlchemy アプリケーションに互換性のある機能を提供します。 SQLAlchemyの使用については、 SQLAlchemy のドキュメント をご参照ください。

ただし、Snowflake SQLAlchemy はSnowflake固有のパラメーターと動作も提供します。これについては、次のセクションで説明します。

接続パラメーター

必須パラメーター

Snowflake SQLAlchemy は、次の接続文字列構文を使用してSnowflakeに接続し、セッションを開始します:

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

条件:

  • <ユーザーログイン名> は、Snowflakeユーザーのログイン名。

  • <パスワード> は、Snowflakeユーザーのパスワード。

  • <アカウント識別子> は、使用するアカウント識別子です。 アカウント識別子 をご参照ください。

    注釈

    使用するアカウント識別子の一部に、 snowflakecomputing.com ドメイン名を 含めない でください。Snowflakeは、使用するアカウント識別子にドメイン名を自動的に追加して、必要な接続を作成します。

追加の接続パラメーター

オプションで、接続文字列の最後( <アカウント名> の後)に、次の追加情報を含めることができます。

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

条件:

  • <データベース名> および <スキーマ名> は、スラッシュ(/)で区切られたSnowflakeセッションの初期データベースとスキーマです。

  • warehouse=<ウェアハウス名> および role=<ロール名>' はセッションの初期ウェアハウスおよびロールであり、パラメーター文字列として指定され、疑問符(?)で区切られています。

注釈

ログイン後、接続文字列で指定された初期データベース、スキーマ、ウェアハウス、およびロールは、セッションに対していつでも変更できます。

プロキシサーバーの構成

プロキシサーバーのパラメーターはサポートされていません。代わりに、サポートされている環境変数を使用してプロキシサーバーを構成します。詳細については、 プロキシサーバーの使用 をご参照ください。

接続文字列の例

次の例では、ユーザー名 testuser1、パスワード 0123456、アカウント識別子 myorganization-myaccount、データベース testdb、スキーマ public、ウェアハウス testwh、およびロール myrolecreate_engine メソッドを呼び出します。

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

便宜上、 snowflake.sqlalchemy.URL メソッドを使用して接続文字列を作成し、データベースに接続できます。次の例では、前の例と同じ接続文字列を作成します。

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

接続の開始および終了

engine.connect() を実行して接続を開始します。 engine.execute() の使用は避けてください。

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

注釈

engine.dispose() の前に connection.close() を実行して、接続を終了してください。そうしない場合は、Python GarbageコレクターがSnowflakeとの通信に必要なリソースを削除するため、Pythonコネクタはセッションを適切に終了できなくなります。

明示的なトランザクションを使用する場合は、 SQLAlchemy 内の AUTOCOMMIT 実行オプション を無効にする必要があります。

デフォルトでは、 SQLAlchemy はこのオプションを有効にします。このオプションを有効にすると、 INSERT、 UPDATE、および DELETE ステートメントは、明示的なトランザクション内で実行された場合でも、実行時に自動的にコミットされます。

AUTOCOMMIT を無効にするには、 autocommit=FalseConnection.execution_options() メソッドに渡します。例:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

自動インクリメントの動作

値を自動インクリメントするには、 Sequence オブジェクトが必要です。新しいレコードが挿入されるたびに値を自動的にインクリメントするには、主キー列に Sequence オブジェクトを含めます。例:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

オブジェクト名の大文字小文字の処理

Snowflakeは、大文字と小文字を区別しないすべてのオブジェクト名を大文字で保存します。対照的に、 SQLAlchemy はすべての小文字のオブジェクト名が大文字と小文字を区別しないと見なします。Snowflake SQLAlchemy は、スキーマレベルの通信中(つまり、テーブルとインデックスのリフレクション中)に、オブジェクト名の大文字と小文字を変換します。大文字のオブジェクト名を使用する場合、 SQLAlchemy は大文字と小文字が区別されると想定し、名前を引用符で囲みます。この動作により、Snowflakeから受信したデータディクショナリデータとの不一致が発生するため、引用符(例: "TestDb")を使用して大文字と小文字を区別する識別子名を作成しない限り、SQLAlchemy 側では、すべて小文字の名前を使用する必要があります。

インデックスのサポート

Snowflakeはインデックスを使用しないため、Snowflake SQLAlchemyも使用しません。

Numpyデータ型のサポート

Snowflake SQLAlchemy は、 NumPy データ型のバインドとフェッチをサポートしています。バインディングは常にサポートされています。 NumPy データ型のフェッチを有効にするには、接続パラメーターに numpy=True を追加します。

次の NumPy データ型がサポートされています。

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

次の例は、 numpy.datetime64 データのラウンドトリップを示しています。

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date
Copy

列メタデータのキャッシュ

SQLAlchemy は ランタイム検査 API を提供して、さまざまなオブジェクトに関するランタイム情報を取得します。一般的な使用例の1つは、スキーマカタログを構築するために、スキーマ内のすべてのテーブルとその列メタデータを取得することです。例えば、 SQLAlchemy の上にある alembic は、データベーススキーマの移行を管理します。擬似コードフローは次のとおりです:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

このフローでは、潜在的な問題として、各テーブルでクエリを実行するのにかなり時間がかかることがあります。結果はキャッシュされますが、列のメタデータの取得には高額な費用がかかります。

この問題を軽減するために、Snowflake SQLAlchemy はフラグ cache_column_metadata=True を取り、 get_table_names が呼び出されたときにすべてのテーブルのすべての列メタデータがキャッシュされ、残りの get_columnsget_primary_keys および get_foreign_keys がキャッシュを利用できるようにします。

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

注釈

すべての列メタデータが Inspector オブジェクトに関連付けられてキャッシュされるため、メモリ使用量が増加します。すべての列メタデータを取得する必要がある場合にのみ、フラグを使用します。

VARIANT 、ARRAY、および OBJECT のサポート

Snowflake SQLAlchemy は、 VARIANTARRAY、および OBJECT データ型の取得をサポートしています。すべての型はPythonで str に変換されるため、 json.loads を使用してネイティブデータ型に変換できます。

この例は、 VARIANTARRAY、および OBJECT データ型の列を含むテーブルを作成する方法を示しています。

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

VARIANTARRAY、および OBJECT データ型の列を取得してネイティブPythonデータ型に変換するには、次のようにデータを取得して json.loads メソッドを呼び出します。

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

CLUSTER BY のサポート

Snowflake SQLAlchemy は、テーブルの CLUSTER BY パラメーターをサポートしています。パラメーターの詳細については、 CREATE TABLE をご参照ください。

この例では、クラスタリングキーとして idname の2つの列を持つテーブルを作成する方法を示します。

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Alembicのサポート

Alembic は、 SQLAlchemy の上にあるデータベース移行ツールです。Snowflake SQLAlchemy は、AlembicがSnowflake SQLAlchemyを認識できるように、次のコードを alembic/env.py に追加することで機能します。

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

一般的な使用法については、 Alembicドキュメント をご参照ください。

キーペア認証のサポート

Snowflake SQLAlchemy は、Python用Snowflakeコネクタの機能を利用してキーペア認証をサポートします。秘密キーと公開キーを作成するステップについては、 キーペア認証とキーペアローテーションの使用 をご参照ください。

秘密キーパラメーターは、次のように connect_args を介して渡されます。

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

PRIVATE_KEY_PASSPHRASE は、秘密キーファイル rsa_key.p8 を復号化するためのパスフレーズです。

snowflake.sqlalchemy.URL メソッドは秘密キーパラメーターをサポートしていません。

マージコマンドのサポート

Snowflake SQLAlchemy は、その MergeInto カスタム式によるアップサートの実行をサポートしています。包括的なドキュメントについては、 MERGE をご参照ください。

次のように使用します。

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

CopyIntoStorage のサポート

Snowflake SQLAlchemy は、カスタム CopyIntoStorage 式を使用して、テーブルとクエリ結果をさまざまなSnowflakeステージ、Azureコンテナー、および AWS バケットに保存することをサポートしています。包括的なドキュメントについては、 COPY INTO <場所> をご参照ください。

次のように使用します。

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy