Snowflake SQLAlchemy ツールキットおよびPythonコネクターの使用

Snowflake SQLAlchemy は、Snowflakeデータベースと SQLAlchemy アプリケーションをつなぐ 方言 として、Python用Snowflakeコネクタの上で実行されます。

このトピックの内容:

前提条件

Python用Snowflakeコネクタ

Snowflake SQLAlchemy の唯一の要件は、Python用Snowflakeコネクタです。ただし、Snowflake SQLAlchemy をインストールするとコネクタが自動的にインストールされるため、コネクタをインストールする必要はありません。

データアナリティクスとウェブアプリケーションフレームワーク(オプション)

Snowflake SQLAlchemy は、 PandasJupyter および Pyramid で使用できます。これらは、データアナリティクスおよびウェブアプリケーション用のより高いレベルのアプリケーションフレームワークを提供します。ただし、作業環境をゼロから構築することは、特に初心者ユーザーにとっては簡単な作業ではありません。フレームワークをインストールするには、Cコンパイラとツールが必要です。適切なツールとバージョンを選択することは、ユーザーがPythonアプリケーションを使用するのを妨げるハードルです。

環境を構築する簡単な方法は、 Anaconda を使用することです。これにより、データアナリストや学生などのPython以外の専門家を含むすべてのユーザーに完全なプリコンパイルテクノロジースタックが提供されます。Anacondaのインストール手順については、 Anacondaのインストールドキュメント をご参照ください。その後、 pip を使用して、Snowflake SQLAlchemy パッケージをAnacondaの上にインストールできます。

Snowflake SQLAlchemy のインストール

Snowflake SQLAlchemy パッケージは、 pip を使用してパブリック PyPI リポジトリからインストールできます。

pip install --upgrade snowflake-sqlalchemy

pip は、Python用Snowflakeコネクタを含むすべての必要なモジュールを自動的にインストールします。

開発者ノートは、ソースコード GitHub にホストされていることに注意してください。

インストールの確認

  1. 次のPythonサンプルコードを含むファイル(例: validate.py)を作成します。これは、Snowflakeに接続し、Snowflakeバージョンを表示します。

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account}/'.format(
            user='<your_user_login_name>',
            password='<your_password>',
            account='<your_account_name>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
  2. <ユーザーログイン名><パスワード>、および <アカウント名> をSnowflakeアカウントおよびユーザーの適切な値に置き換えてください。詳細については、 接続パラメーター (このトピック内)をご参照ください。

  3. サンプルコードを実行します。例えば、 validate.py という名前のファイルを作成した場合、

    python validate.py
    

Snowflakeバージョン(例: 1.48.0)が表示されます。

Snowflake固有のパラメーターと動作

Snowflake SQLAlchemy は、可能な限り、 SQLAlchemy アプリケーションに互換性のある機能を提供します。 SQLAlchemyの使用については、 SQLAlchemy のドキュメント をご参照ください。

ただし、Snowflake SQLAlchemy はSnowflake固有のパラメーターと動作も提供します。これについては、次のセクションで説明します。

接続パラメーター

必須パラメーター

Snowflake SQLAlchemy は、次の接続文字列構文を使用してSnowflakeに接続し、セッションを開始します:

'snowflake://<user_login_name>:<password>@<account_name>'

条件:

  • <ユーザーログイン名> は、Snowflakeユーザーのログイン名。

  • <パスワード> は、Snowflakeユーザーのパスワード。

  • <アカウント名> は、アカウントのフルネーム(Snowflakeが提供)。

    完全なアカウント名には、アカウントがホストされている 地域、および クラウドプラットフォーム を識別する 追加 のセグメントが含まれている場合があります。

    地域別のアカウント名の例

    アカウント名が xy12345 の場合、

    クラウドプラットフォーム/地域

    完全なアカウント名

    AWS

    US 西部(オレゴン)

    xy12345

    US 東部(オハイオ)

    xy12345.us-east-2.aws

    US 東部(バージニア北部)

    xy12345.us-east-1

    US 東部(商業組織、バージニア政府北部)

    xy12345.us-east-1-gov.aws

    カナダ(中部)

    xy12345.ca-central-1.aws

    EU (アイルランド)

    xy12345.eu-west-1

    EU (フランクフルト)

    xy12345.eu-central-1

    アジア太平洋(東京)

    xy12345.ap-northeast-1.aws

    アジア太平洋(ムンバイ)

    xy12345.ap-south-1.aws

    アジア太平洋(シンガポール)

    xy12345.ap-southeast-1

    アジア太平洋(シドニー)

    xy12345.ap-southeast-2

    GCP

    US 中央部1(アイオワ)

    xy12345.us-central1.gcp

    ヨーロッパ西部2(ロンドン)

    xy12345.europe-west2.gcp

    ヨーロッパ西部4(オランダ)

    xy12345.europe-west4.gcp

    Azure

    西 US 2(ワシントン)

    xy12345.west-us-2.azure

    東 US 2(バージニア)

    xy12345.east-us-2.azure

    US 政府バージニア

    xy12345.us-gov-virginia.azure

    カナダ中央部(トロント)

    xy12345.canada-central.azure

    西ヨーロッパ(オランダ)

    xy12345.west-europe.azure

    スイス北部(チューリッヒ)

    xy12345.switzerland-north.azure

    東南アジア(シンガポール)

    xy12345.southeast-asia.azure

    オーストラリア東部(ニューサウスウェールズ)

    xy12345.australia-east.azure

    重要

    次のいずれかの条件に該当する場合、アカウント名はこの例の構造とは異なります。

    • Snowflake Editionが VPS の場合、アカウント名の詳細については Snowflakeサポート にお問い合わせください。

    • AWS PrivateLink がアカウントで有効になっている場合、アカウント名には追加の privatelink セグメントが 必要 です。詳細については、 AWS PrivateLink とSnowflake をご参照ください。

    注釈

    アカウント名の一部としてSnowflakeドメイン名(snowflakecomputing.com)を 含めない でください。Snowflakeはアカウント名にドメイン名を自動的に追加して、必要な接続を作成します。

追加の接続パラメーター

オプションで、接続文字列の最後( <アカウント名> の後)に、次の追加情報を含めることができます。

'snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'

条件:

  • <データベース名> および <スキーマ名> は、スラッシュ(/)で区切られたSnowflakeセッションの初期データベースとスキーマです。

  • warehouse=<ウェアハウス名> および role=<ロール名>' はセッションの初期ウェアハウスおよびロールであり、パラメーター文字列として指定され、疑問符(?)で区切られています。

注釈

ログイン後、接続文字列で指定された初期データベース、スキーマ、ウェアハウス、およびロールは、セッションに対していつでも変更できます。

プロキシサーバーの構成

プロキシサーバーのパラメーターはサポートされていません。代わりに、サポートされている環境変数を使用してプロキシサーバーを構成します。詳細については、 プロキシサーバーの使用 をご参照ください。

接続文字列の例

次の例では、ユーザー名 testuser1、パスワード 0123456、アカウント名 xy12345.us-east-1、データベース testdb、スキーマ public、ウェアハウス testwh、およびロール myrolecreate_engine メソッドを呼び出します。

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@xy12345.us-east-1/testdb/public?warehouse=testwh&role=myrole'
)

便宜上、 snowflake.sqlalchemy.URL メソッドを使用して接続文字列を作成し、データベースに接続できます。次の例では、前の例と同じ接続文字列を作成します。

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'xy12345.us-east-1',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))

接続の開始および終了

engine.connect() を実行して接続を開始します。 engine.execute() の使用は避けてください。

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()

注釈

engine.dispose() の前に connection.close() を実行して、接続を終了してください。そうでない場合、Python GarbageコレクターがSnowflakeとの通信に必要なリソースを削除するため、Pythonコネクタはセッションを適切に終了できなくなります。

自動インクリメント動作

値を自動インクリメントするには、 Sequence オブジェクトが必要です。新しいレコードが挿入されるたびに値を自動的にインクリメントするには、主キー列に Sequence オブジェクトを含めます。例:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)

オブジェクト名のケース処理

Snowflakeは、大文字と小文字を区別しないすべてのオブジェクト名を大文字で保存します。対照的に、 SQLAlchemy はすべての小文字のオブジェクト名が大文字と小文字を区別しないと見なします。Snowflake SQLAlchemy は、スキーマレベルの通信中、つまりテーブルとインデックスのリフレクション中にオブジェクト名の大文字と小文字を変換します。大文字のオブジェクト名を使用する場合、 SQLAlchemy は大文字と小文字が区別されると想定し、名前を引用符で囲みます。この動作により、Snowflakeから受信したデータディクショナリデータとの不一致が発生するため、 "TestDb" などの引用符を使用して大文字と小文字を区別する識別子名を作成しない限り、すべての小文字の名前を SQLAlchemy 側で使用する必要があります。

インデックスのサポート

Snowflakeはインデックスを使用しないため、Snowflake SQLAlchemyも使用しません。

Numpyデータ型のサポート

Snowflake SQLAlchemy は、 NumPy データ型のバインドとフェッチをサポートしています。バインディングは常にサポートされています。 NumPy データ型のフェッチを有効にするには、接続パラメーターに numpy=True を追加します。

次の NumPy データ型がサポートされています。

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

次の例は、 numpy.datetime64 データのラウンドトリップを示しています。

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'xy12345',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date

キャッシュ列メタデータ

SQLAlchemy は ランタイム検査 API を提供して、さまざまなオブジェクトに関するランタイム情報を取得します。一般的な使用例の1つは、スキーマカタログを構築するために、スキーマ内のすべてのテーブルとその列メタデータを取得することです。例えば、 SQLAlchemy の上にある alembic は、データベーススキーマの移行を管理します。擬似コードフローは次のとおりです:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...

このフローでは、潜在的な問題として、各テーブルでクエリを実行するのにかなり時間がかかることがあります。結果はキャッシュされますが、列のメタデータの取得には高額な費用がかかります。

この問題を軽減するために、Snowflake SQLAlchemy はフラグ cache_column_metadata=True を取り、 get_table_names が呼び出されたときにすべてのテーブルのすべての列メタデータがキャッシュされ、残りの get_columnsget_primary_keys および get_foreign_keys がキャッシュを利用できるようにします。

engine = create_engine(URL(
    account = 'xy12345',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))

注釈

すべての列メタデータが Inspector オブジェクトに関連付けられてキャッシュされるため、メモリ使用量が増加します。すべての列メタデータを取得する必要がある場合にのみ、フラグを使用します。

VARIANT 、ARRAY および OBJECT サポート

Snowflake SQLAlchemy は、 VARIANTARRAY、および OBJECT データ型の取得をサポートしています。すべての型はPythonで str に変換されるため、 json.loads を使用してネイティブデータ型に変換できます。

この例は、 VARIANTARRAY、および OBJECT データ型の列を含むテーブルを作成する方法を示しています。

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)

VARIANTARRAY、および OBJECT データ型の列を取得してネイティブPythonデータ型に変換するには、次のようにデータを取得して json.loads メソッドを呼び出します。

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])

CLUSTER BY のサポート

Snowflake SQLAchemy は、テーブルの CLUSTER BY パラメーターをサポートしています。パラメーターの詳細については、 CREATE TABLE をご参照ください。

この例では、クラスタリングキーとして idname の2つの列を持つテーブルを作成する方法を示します。

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)

Alembicのサポート

Alembic は、 SQLAlchemy の上にあるデータベース移行ツールです。Snowflake SQLAlchemy は、AlembicがSnowflake SQLAlchemyを認識できるように、次のコードを alembic/env.py に追加することで機能します。

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'

一般的な使用法については、 Alembicドキュメント をご参照ください。