Pythonコネクタの使用

このトピックでは、Snowflakeコネクタを使用して、ユーザーログイン、データベースとテーブルの作成、ウェアハウスの作成、データの挿入/読み込み、クエリなどの標準のSnowflake操作を実行する方法を示す一連の例を提供します。

このトピックの最後にあるサンプルコードは、例を組み合わせて単一の動作するPythonプログラムにします。

このトピックの内容:

SnowCD を使用したSnowflakeへのネットワーク接続の検証

ドライバーを設定したら、 SnowCD を使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

初期設定プロセス中にオンデマンドで SnowCD をいつでも使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

Snowflakeへの接続

snowflake.connector モジュールをインポートします。

import snowflake.connector

環境変数、コマンドライン、構成ファイル、または別の適切なソースからログイン情報を読み取ります。例:

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...

ACCOUNT パラメーターには、使用する アカウント識別子 を使用します。アカウント識別子には snowflakecomputing.com サフィックスが含まれて いない ことに注意してください。

詳細と例については、 account パラメーターの使用上の注意(connect メソッドの場合) をご参照ください。

注釈

使用可能なコネクタパラメーターの説明については、 snowflake.connector メソッド をご参照ください。

独自のAmazon S3バケットからデータをコピーする場合は、 AWS_ACCESS_KEY_ID と AWS_SECRET_ACCESS_KEY が必要です。

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

注釈

データがMicrosoft Azureコンテナーに保存されている場合は、 COPY ステートメントで認証情報を直接提供します。

接続情報を読み取った後、デフォルトの認証方式またはフェデレーション認証(有効な場合)を使用して接続します。

セッションパラメーターの設定

Pythonコネクターを使用する場合、 QUERY_TAGなどのセッションパラメーターを設定する方法は複数あります。

Snowflakeに接続するときに、セッションレベルのパラメーターを設定できます。これを行うには、次に示すように、「session_parameters」という名前のオプションの接続パラメーターを渡します。

con = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'EndOfMonthFinancials',
    }
)

connect()メソッドに渡されるsession_parameters辞書には、1つ以上のセッションレベルのパラメーターを含めることができます。

接続後に SQL ステートメント ALTER SESSION SET ... を実行して、セッションパラメータを設定することもできます。

con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")

セッションパラメータの詳細については、一般的な パラメーター ページの個々のパラメーターの説明をご参照ください。

デフォルト認証方式を使用した接続

ログインパラメーターを使用してSnowflakeに接続します。

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )

これを他の情報で拡張する必要がある場合があります。

認証でのシングルサインオン(SSO)の使用

シングルサインオン(SSO)を使用するようにSnowflakeを構成 している場合、認証に SSO を使用するようにクライアントアプリケーションを構成できます。詳細については Snowflakeに接続するクライアントアプリケーションでの SSO の使用 をご参照ください。

多要素認証(MFA)の使用

Snowflakeは、 MFA トークンキャッシングと SSO の組み合わせを含む、 MFA トークンのキャッシングをサポートしています。

詳細については、 MFA トークンキャッシングを使用して認証中のプロンプトの数を最小限に抑える --- オプション をご参照ください。

キーペア認証およびキーペアローテーションの使用

Pythonコネクタは、キーペア認証とキーローテーションをサポートしています。

キーペア認証とキーローテーションを構成する方法の詳細については、 キーペア認証およびキーペアローテーション をご参照ください。

キーペア認証の構成が完了したら、 connect 関数の private_key パラメーターを秘密キーファイルへのパスに設定します。

  1. 以下のサンプルコードを変更して実行します。コードは秘密キーファイルを復号化し、Snowflakeドライバーに渡して接続を作成します。

サンプルコード

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account_identifier>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()

プロキシサーバーの使用

プロキシサーバーを使用するには、次の環境変数を構成します。

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

注釈

プロキシパラメーター(つまり、 proxy_hostproxy_portproxy_user、および proxy_password)は非推奨です。代わりに環境変数を使用してください。

例:

Linuxまたは macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80

ちなみに

Snowflakeのセキュリティモデルは、Secure Sockets Layer(SSL)プロキシを許可しません( HTTPS 証明書を使用)。プロキシサーバーは、公的に利用可能な認証機関(CA)を使用する必要があり、侵害されたプロキシを介した MITM (Man In The Middle)攻撃などの潜在的なセキュリティリスクを低減します。

SSL プロキシを使用する 必要 がある場合は、通信中に証明書が変更されないように、サーバーポリシーを更新してSnowflake証明書を渡すことを強くお勧めします。

オプションで NO_PROXY を使用して、特定の通信のプロキシをバイパスできます。たとえば、Amazon S3へのアクセスは、 NO_PROXY=".amazonaws.com" を指定してプロキシサーバーをバイパスできます。

NO_PROXY はワイルドカードをサポートしていません。指定する各値は、次のいずれかである必要があります。

  • ホスト名の末尾(または完全なホスト名)。例:

    • .amazonaws.com

    • myorganization-myaccount.snowflakecomputing.com

  • IP アドレス。例:

    • 192.196.1.15

複数の値を指定する場合、値はコンマで区切る必要があります。次に例を示します。

localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16

OAuth との接続

OAuthを使用して接続するには、接続文字列に oauth に設定された authenticator パラメーターと oauth_access_token に設定された token パラメーターを含める必要があります。詳細については、 OAuth のクライアント、ドライバー、およびコネクター をご参照ください。

ctx = snowflake.connector.connect(
    user="<username>",
    host="<hostname>",
    account="<account_identifier>",
    authenticator="oauth",
    token="<oauth_access_token>",
    warehouse="test_warehouse",
    database="test_db",
    schema="test_schema"
)

OCSP

ドライバーが接続すると、Snowflakeは証明書を送信して、Snowflakeを偽装しているホストではなく、Snowflakeへの接続であることを確認します。ドライバーはその証明書を OCSP (オンライン証明書状態プロトコル)サーバーに送信し、証明書が失効していないことを確認します。

ドライバーが証明書を確認するために OCSP サーバーに到達できない場合、ドライバーは "fail open" or "fail closed" できます。

フェールオープンまたはフェールクローズモードの選択

1.8.0より前のバージョンのPython用Snowflakeコネクタは、デフォルトでフェールクローズモードに設定されています。バージョン1.8.0以降はデフォルトでフェールオープンになります。connect()メソッドを呼び出すときにオプションの接続パラメーター ocsp_fail_open を設定することにより、デフォルトの動作を上書きできます。例:

con = snowflake.connector.connect(
    account=<account_identifier>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);

OCSP コネクタまたはドライバーバージョンの確認

ドライバーまたはコネクターのバージョンとその構成により、 OCSP の動作が決まります。ドライバーまたはコネクタのバージョン、それらの構成、および OCSP の動作の詳細については、 OCSP 設定 をご参照ください。

OCSP 応答のキャッシュ

すべての通信が安全であることを確認するために、Python用Snowflakeコネクタは HTTPS プロトコルを使用してSnowflakeおよび他のすべてのサービスに接続します(例:データファイルのステージング用のAmazon S3およびフェデレーション認証用のOkta)。通常の HTTPS プロトコルに加えて、コネクタは OCSP(オンライン証明書状態プロトコル)を介して各接続の TLS/SSL 証明書失効ステータスもチェックし、証明書が失効した場合や、OCSP ステータスが信頼できない場合は、接続を中止します。

各Snowflake接続は OCSP サーバーとの最大3つのラウンドトリップをトリガーするため、 OCSP 応答の複数レベルのキャッシュを導入して、接続に追加されるネットワークオーバーヘッドを削減しました。

  • プロセスの存続期間中保持されるメモリキャッシュ。

  • ファイルキャッシュ。キャッシュディレクトリ(例: ~/.cache/snowflake)がパージされるまで保持されます。

  • OCSP 応答サーバーキャッシュ。

キャッシュは、 OCSP サーバーの可用性の問題にも対処します(つまり、実際の OCSP サーバーがダウンした場合)。キャッシュが有効である限り、コネクタは証明書失効ステータスを検証できます。

キャッシュレイヤーに OCSP 応答が含まれていない場合、クライアントは CAの OCSP サーバーから検証ステータスを直接フェッチしようとします。

OCSP 応答ファイルキャッシュの場所の変更

デフォルトでは、ファイルキャッシュは次の場所で有効になっているため、追加の構成タスクは必要ありません。

Linux

~/.cache/snowflake/ocsp_response_cache.json

macOS

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

ただし、 OCSP 応答キャッシュファイルに別の場所やファイル名を指定する場合、 connect メソッドは URI 形式の OCSP キャッシュファイルのパスと名前を指定する、 ocsp_response_cache_filename パラメーターを受け入れます。

OCSP 応答キャッシュサーバー

注釈

OCSP 応答キャッシュサーバーは現在、Python用Snowflakeコネクタ1.6.0以降でサポートされています。

OCSP キャッシュのメモリとファイルの種類は、Snowflakeが提供するクライアントの1つを永続的なホストで使用して、Snowflakeに接続されたアプリケーションで良好に機能します。ただし、 AWS LambdaやDockerなどの動的にプロビジョニングされた環境では機能しません。

この状況に対処するために、Snowflakeは3番目のキャッシュレベルである OCSP 応答キャッシュサーバーを提供します。OCSP 応答キャッシュサーバーは、 CAの OCSP サーバーから1時間ごとに OCSP 応答をフェッチし、24時間保存します。クライアントは、このサーバーキャッシュから特定のSnowflake証明書の検証ステータスをリクエストできます。

重要

サーバーポリシーでほとんどまたはすべての外部 IP アドレスおよびウェブサイトへのアクセスが拒否された場合、通常のサービス操作を許可するには、キャッシュサーバーアドレスを 必ず 許可する必要があります。キャッシュサーバー URL は ocsp*.snowflakecomputing.com:80 です。

何らかの理由でキャッシュサーバーを無効化する必要がある場合は、 SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED 環境変数を false に設定します。値は大文字と小文字が区別され、小文字にする必要があることに注意してください。

データベース、スキーマ、ウェアハウスの作成

ログイン後、 CREATE DATABASECREATE SCHEMA、および CREATE WAREHOUSE コマンドを使用して、データベース、スキーマ、およびウェアハウスを作成します(まだ存在しない場合)。

以下の例は、 tiny_warehouse という名前のウェアハウス、 testdb という名前のデータベース、および testschema という名前のスキーマを作成する方法を示しています。スキーマを作成するときは、スキーマを作成するデータベースの名前を指定するか、スキーマを作成するデータベースに既に接続している必要があります。次の例では、 CREATE SCHEMA コマンドの前に USE DATABASE コマンドを実行して、スキーマが正しいデータベースに作成されるようにします。

        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

データベース、スキーマ、ウェアハウスの使用

テーブルを作成するデータベースとスキーマを指定します。また、 DML ステートメントおよびクエリを実行するためのリソースを提供するウェアハウスを指定します。

たとえば、データベース testdb、スキーマ testschema およびウェアハウス tiny_warehouse (以前に作成した)を使用するには、

        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")

テーブルの作成およびデータの挿入

CREATE TABLE コマンドを使用してテーブルを作成し、 INSERT コマンドを使用してテーブルにデータを入力します。

たとえば、 testtable という名前のテーブルを作成し、テーブルに2つの行を挿入します。

    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

データのロード

個々の INSERT コマンドを使用してテーブルにデータを挿入する代わりに、内部または外部の場所にステージングされたファイルからデータを一括ロードできます。

内部の場所からのデータのコピー

ホストマシン上のファイルからテーブルにデータを読み込むには、最初に PUT コマンドを使用してファイルを内部の場所にステージングし、次に COPY INTO <テーブル> コマンドを使用してファイル内のデータをテーブルにコピーします。

例:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

Linuxまたは macOS 環境の /tmp/data という名前のローカルディレクトリに CSV データが保存され、ディレクトリには file0file1、... file100 という名前のファイルが含まれます。

外部の場所からのデータのコピー

外部の場所(つまり、S3バケット)に既にステージングされているファイルからテーブルにデータをロードするには、 COPY INTO <テーブル> コマンドを使用します。

例:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

条件:

  • s3://<s3バケット>/data/ はS3バケットの名前を指定します

  • バケット内のファイルの先頭には data が付きます。

  • バケットには、アカウント管理者(つまり、ACCOUNTADMIN のロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールによって CREATE STORAGE INTEGRATION を使用して作成されたストレージ統合を使用してアクセスされます。ストレージ統合により、ユーザーはプライベートストレージの場所にアクセスするための認証情報を提供する必要がなくなります。

注釈

この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。

データのクエリ

Python用Snowflakeコネクタを使用すると、次の情報を送信できます。

  • 同期クエリ。クエリが完了した後にアプリケーションに制御を返します。

  • 非同期クエリ。クエリが完了する前にアプリケーションに制御を返します。

クエリが完了したら、 Cursor オブジェクトを使用して 結果の値をフェッチ します。デフォルトでは、Python用Snowflakeコネクタは、 値を Snowflakeデータ型 からPythonのネイティブデータ型に変換します。(値を文字列として返し、アプリケーションで型変換を実行するように選択することもできます。 データ変換をバイパスすることによるクエリパフォーマンスの改善 をご参照ください。)

注釈

デフォルトでは、 NUMBER 列の値は倍精度浮動小数点値(float64)として返されます。fetch_pandas_all() メソッドと fetch_pandas_batches() メソッドでこれらを10進値(decimal.Decimal)として返すには、 connect() メソッドの arrow_number_to_decimal パラメーターを True に設定します。

同期クエリの実行

同期クエリを実行するには、 Cursor オブジェクトで execute() メソッドを呼び出します。例:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')

cursor の使用による値のフェッチ で説明されているように、 Cursor オブジェクトを使用して結果の値をフェッチします。

非同期クエリの実行

Python用Snowflakeコネクタは、非同期クエリ(つまり、クエリが完了する前にユーザーに制御を返すクエリ)をサポートしています。非同期クエリを送信し、ポーリングを使用して、クエリがいつ完了したかを判断できます。クエリが完了すると、結果を取得できます。

この機能を使用すると、各クエリが完了するのを待たずに、複数のクエリを並行して送信できます。同じセッション中に、同期クエリと非同期クエリを組み合わせて実行することもできます。

最後に、ある接続から非同期クエリを送信し、別の接続からの結果を確認できます。たとえば、ユーザーはアプリケーションから長時間実行されるクエリを開始し、アプリケーションを終了し、後ほどアプリケーションを再起動して結果を確認することができます。

非同期クエリの送信

非同期クエリを送信するには、 Cursor オブジェクトで execute_async() メソッドを呼び出します。例:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

クエリを送信した後、

非同期クエリの実行例については、 非同期クエリの例 をご参照ください。

非同期クエリのベストプラクティス

非同期クエリを送信するときは、次のベストプラクティスに従ってください。

  • クエリを並行して実行する前に、どのクエリが他のクエリに依存しているかを確認してください。一部のクエリは相互依存しており、順序に依存するため、並列化には適していません。たとえば、 INSERT ステートメントは、対応する CREATE TABLE ステートメントが終了するまで開始するべきではありません。

  • 使用可能なメモリに対して、実行するクエリが多すぎないことを確認してください。特に複数の結果が同時にメモリに保存されている場合に、複数のクエリを並行して実行すると通常は、より多くのメモリを使用します。

  • ポーリング時に、クエリが成功しないまれなケースを処理します。

  • トランザクション制御ステートメント(BEGIN、 COMMIT、および ROLLBACK)が他のステートメントと並行して実行されないようにします。

Snowflakeクエリ ID の取得

クエリ ID は、Snowflakeによって実行される各クエリを識別します。Python用Snowflakeコネクタを使用してクエリを実行すると、 Cursor オブジェクトの sfqid 属性を介してクエリ ID にアクセスできます。

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

クエリ ID を使用して次を実行できます。

クエリのステータスの確認

クエリのステータスを確認するには、

  1. Cursor オブジェクトの sfqid フィールドから、クエリ ID を取得します。

  2. クエリ ID を Connection オブジェクトの get_query_status() メソッドに渡して、クエリのステータスを表す QueryStatus 列挙型定数を返します。

    デフォルトでは、クエリの結果がエラーになった場合、 get_query_status() はエラーを発生しません。エラーを発生したい場合は、代わりに get_query_status_throw_if_error() を呼び出してください。

  3. QueryStatus 列挙型定数を使用して、クエリのステータスを確認します。

    • クエリがまだ実行されているかどうかを判断するには(たとえば、これが非同期クエリの場合)、定数を Connection オブジェクトの is_still_running() メソッドに渡します。

    • エラーが発生したかどうかを判断するには、定数を is_an_error() メソッドに渡します。

    列挙型定数の包括的なリストについては、 QueryStatus をご参照ください。

次の例では、非同期クエリを実行し、クエリのステータスを確認します。

import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
  time.sleep(1)

次の例では、クエリの結果でエラーある場合にエラーが発生します。

from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
  query_id = cur.sfqid
  while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
    time.sleep(1)
except ProgrammingError as err:
  print('Programming Error: {0}'.format(err))

クエリ ID を使用したクエリの結果の取得

注釈

Cursor オブジェクトで execute() メソッドを呼び出して 同期クエリを実行 した場合は、結果を取得するためにクエリ ID を使用する必要はありません。 cursor の使用による値のフェッチ で説明されているように、結果から値をフェッチするだけです。

非同期クエリまたは以前に送信された同期クエリの結果を取得する場合は、次の手順に従います。

  1. 該当クエリのクエリ ID を取得します。 Snowflakeクエリ ID の取得 をご参照ください。

  2. Cursor オブジェクトで get_results_from_sfqid() メソッドを呼び出して、結果を取得します。

  3. cursor の使用による値のフェッチ で説明されているように、 Cursor オブジェクトを使用して結果の値をフェッチします。

クエリがまだ実行中の場合、フェッチメソッド( fetchone()fetchmany()fetchall() など)はクエリの完了を待機することに注意してください。

例:

# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')

cursor の使用による値のフェッチ

カーソルオブジェクトの反復子メソッドを使用して、テーブルから値をフェッチします。

たとえば、以前に( テーブルの作成およびデータの挿入 で)作成された testtable という名前のテーブルから「col1」および「col2」という名前の列をフェッチするには、次のようなコードを使用します。

    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()

または、Python用Snowflakeコネクタが便利なショートカットを提供します。

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

単一の結果(つまり単一の行)を取得する必要がある場合は、 fetchone メソッドを使用します。

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

一度に指定された行数を取得する必要がある場合は、行数で fetchmany メソッドを使用します。

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

注釈

結果セットが大きすぎてメモリに収まらない場合は、 fetchone または fetchmany を使用します。

すべての結果を一度に取得する必要がある場合、

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

クエリのタイムアウトを設定するには、「開始」コマンドを実行し、クエリにタイムアウトパラメーターを含めます。クエリがパラメーター値の長さを超える場合、エラーが生成され、ロールバックが発生します。

次のコードでは、エラー604はクエリがキャンセルされたことを意味します。タイムアウトパラメーターは Timer() を開始し、指定された時間内にクエリが終了しない場合はキャンセルします。

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

DictCursor の使用による列名の値のフェッチ

列名で値をフェッチする場合は、タイプ DictCursorcursor オブジェクトを作成します。

例:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

非同期クエリの例

以下は、非同期クエリの簡単な例です。

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')

次の例では、ある接続から非同期クエリを送信し、別の接続から結果を取得します。

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Get the query ID for the asynchronous query.
query_id = cur.sfqid

# Close the cursor and the connection.
cur.close()
conn.close()

# Open a new connection.
new_conn = snowflake.connector.connect( ... )

# Create a new cursor.
new_cur = new_conn.cursor()

# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')

クエリ ID ごとのクエリのキャンセル

クエリ ID によるクエリのキャンセル:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()

文字列「queryID」を実際のクエリ ID に置き換えます。クエリ ID を取得するには、 Snowflakeクエリ ID の取得 をご参照ください。

データ変換をバイパスすることによるクエリパフォーマンスの改善

クエリのパフォーマンスを向上させるには、 snowflake.connector.converter_null モジュールの SnowflakeNoConverterToPython クラスを使用して、Snowflake内部データ型からネイティブPythonデータ型へのデータ変換をバイパスします。例:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data

その結果、すべてのデータは文字列形式で表されるため、アプリケーションはそれをネイティブPythonデータ型に変換する必要があります。たとえば、 TIMESTAMP_NTZ および TIMESTAMP_LTZ データは文字列形式で表されるエポック時間であり、 TIMESTAMP_TZ データは文字列形式で表されるエポック時間の後に、空白と分単位の UTC へのオフセットが続きます。

バインドデータに影響はありません。Pythonネイティブデータは引き続き更新用にバインドできます。

データのバインド

SQL ステートメントで使用する値を指定するには、ステートメントにリテラルを含めるか、変数をバインドします。変数をバインドするときは、 SQL ステートメントのテキストに1つ以上のプレースホルダーを配置し、各プレースホルダーに変数(使用する値)を指定します。

次の例は、リテラルとバインドの使用を比較しています。

リテラル:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")

バインド:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))

注釈

バインドできるデータのサイズ、またはバッチで結合できるデータのサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。

Snowflakeは、次のタイプのバインドをサポートしています。

これらのそれぞれについて、以下で説明します。

pyformat または format バインディング

pyformat バインディングと format バインディングの両方が、サーバー側ではなくクライアント側でデータをバインドします。

デフォルトでは、Python用Snowflakeコネクタは pyformat および format の両方をサポートしているため、プレースホルダーとして %(name)s または %s を使用できます。例:

  • %(name)s をプレースホルダーとして使用:

        conn.cursor().execute(
            "INSERT INTO test_table(col1, col2) "
            "VALUES(%(col1)s, %(col2)s)", {
                'col1': 789,
                'col2': 'test string3',
                })
    
  • %s をプレースホルダーとして使用。

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

pyformatformat により、リストオブジェクトを使用して IN 演算子のデータをバインドすることもできます。

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

パーセント文字(「%」)は、 SQL LIKE のワイルドカード文字であり、Pythonの形式バインディング文字でもあります。フォーマットバインディングを使用し、 SQL コマンドにパーセント文字が含まれている場合、パーセント文字をエスケープする必要がある場合があります。例えば、 SQL ステートメントが次の場合:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.

Pythonコードは次のようになります(元のパーセント記号をエスケープするための追加のパーセント記号に注意)。

        sql_command = "select col1, col2 from test_table "
        sql_command += " where col2 like '%%York' limit %(lim)s"
        parameter_dictionary = {'lim': 1 }
        cur.execute(sql_command, parameter_dictionary)

qmark または numeric バインディング

qmark バインディングと numeric バインディングの両方とも、クライアント側ではなくサーバー側でデータをバインドします。

  • qmark バインディングの場合は、疑問符文字(?)を使用して、変数の値を文字列のどこに挿入するかを指定します。

  • numeric バインディングの場合は、コロン(:)の後に数字を使用して、その位置で置換する変数の位置を指定します。たとえば、 :2 は2番目の変数を指定します。

    数値バインディングを使用して、同じクエリで同じ値を複数回バインドします。たとえば、長い VARCHAR、 BINARY、または 半構造化 の値を複数回使用する場合は、 numeric バインディングにより値をサーバーに1回送信すると、それを複数回使用できるようになります。

次のセクションでは、 qmark および numeric バインディングの使用方法について説明します。

qmark または numeric バインディングの使用

qmark または numeric スタイルのバインディングを使用するには、次のいずれかを実行します。

  • snowflake.connector.paramstyle='qmark'

  • snowflake.connector.paramstyle='numeric'

重要

connect() メソッドを呼び出す 前にparamstyle 属性を設定する必要があります。

paramstyleqmark または numeric に設定する場合は、プレースホルダーとしてそれぞれ ? または :NN は数字に置換)を使用する必要があります。

例:

  • ? をプレースホルダーとして使用。

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • :N をプレースホルダーとして使用。

    import snowflake.connector
    
    snowflake.connector.paramstyle='numeric'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    

    次のクエリは、変数を再利用するための numeric バインディングの使用を示しています。

    con.cursor().execute(
        "INSERT INTO testtable(complete_video, short_sample_of_video) "
        "VALUES(:1, SUBSTRING(:1, :2, :3))", (
            binary_value_that_stores_video,          # variable :1
            starting_offset_in_bytes_of_video_clip,  # variable :2
            length_in_bytes_of_video_clip            # variable :3
        ))
    

qmark または numeric バインディングと datetime の使用

qmark または numeric バインディングを使用してデータをSnowflake TIMESTAMP データ型にバインドする場合は、Snowflakeタイムスタンプデータ型(TIMESTAMP_LTZ または TIMESTAMP_TZ)と値を指定するタプルに、バインド変数を設定します。例:

import snowflake.connector

snowflake.connector.paramstyle='qmark'

con = snowflake.connector.connect(...)

con.cursor().execute(
    "CREATE OR REPLACE TABLE testtable2 ("
    "   col1 int, "
    "   col2 string, "
    "   col3 timestamp_ltz"
    ")"
)

con.cursor().execute(
    "INSERT INTO testtable2(col1,col2,col3) "
    "VALUES(?,?,?)", (
        987,
        'test string4',
        ("TIMESTAMP_LTZ", datetime.now())
    )
 )

クライアント側のバインディングとは異なり、サーバー側のバインディングでは、列にSnowflakeデータ型が必要です。ほとんどの一般的なPythonデータ型には、Snowflakeデータ型への暗黙的なマッピングがすでにあります(例: intFIXED にマッピングされます)。ただし、Python datetime データは、複数のSnowflakeデータ型(TIMESTAMP_NTZTIMESTAMP_LTZ、または TIMESTAMP_TZ)のいずれかにバインドでき、デフォルトのマッピングは TIMESTAMP_NTZ であるため、使用するSnowflakeデータ型を指定する必要があります。

IN 演算子でのバインド変数の使用

qmark および numeric (サーバー側バインディング)は、 IN 演算子を使用したバインド変数の使用をサポートしていません。

IN 演算子でバインド変数を使用する必要がある場合は、 クライアント側バインディングpyformat または format)を使用します。

バッチ挿入の変数に対するバインディングパラメーター

アプリケーションコードで、単一のバッチに複数の行を挿入できます。これを実行するには、 INSERT ステートメントの値のパラメーターを使用します。たとえば、次のステートメントは、 INSERT ステートメントの qmark のバインドにプレースホルダーを使用します。

insert into grocery (item, quantity) values (?, ?)

次に、挿入する必要のあるデータを指定するには、複数のシーケンス(たとえば、タプルのリスト)のシーケンス1つである変数を定義します。

rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]

上記の例に示されているように、リスト内の各項目は、挿入される行の列値を含むタプルです。

バインディングを実行するには、 executemany() メソッドを呼び出し、変数を2番目の引数として渡します。例:

conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
    "insert into grocery (item, quantity) values (?, ?)",
    rows_to_insert)

サーバー上でデータをバインド している場合(つまり、 qmark または numeric バインドを使用)、コネクタはバインドを通じてバッチ挿入のパフォーマンスを最適化できます。

この手法を使用して多数の値を挿入する場合は、インジェストのためにデータを(ローカルマシン上でファイルを作成することなく)仮ステージにストリーミングすると、ドライバーのパフォーマンスを向上させることができます。値の数がしきい値を超えると、ドライバーはこれを自動的に実行します。

ドライバーによりデータを仮ステージに送信するには、ユーザーはスキーマに対して以下の権限を持っている必要があります。

  • CREATE STAGE

ユーザーにこの権限がない場合、ドライバーはフォールバックして、クエリを含むデータをSnowflakeデータベースに送信します。

さらに、セッションの現在のデータベースとスキーマを設定する必要があります。これらが設定されていない場合、ドライバーによって実行される CREATE TEMPORARY STAGE コマンドは、次のエラーにより失敗する可能性があります。

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.

注釈

データをSnowflakeデータベースにロードする別の方法( COPY コマンドを使用した一括ロードを含む)については、 Snowflakeへのデータのロード をご参照ください。

SQL インジェクション攻撃の回避

SQL インジェクションの危険があるため、Pythonのフォーマット関数を使用してデータをバインドしないでください。例:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

代わりに、変数に値を保存し、それらの値をチェックして(例:文字列内の疑わしいセミコロンを探して)、qmarkまたはnumericバインドスタイルを使用してパラメーターをバインドします。

列のメタデータの取得

結果セットの各列に関するメタデータ(例: 各列の名前、タイプ、精度、スケールなど)を取得するには、次のいずれかの方法を使用します。

  • execute() メソッドを呼び出してクエリを実行した後にメタデータにアクセスするには、 Cursor オブジェクトの description 属性を使用します。

  • クエリを実行 することなく メタデータにアクセスするには、 describe() メソッドを呼び出します。

    describe メソッドは、Python用Snowflakeコネクタ2.4.6以降のバージョンで使用できます。

description 属性は、次のいずれかの値に設定されます。

  • バージョン2.4.5以前: タプルのリスト。

  • バージョン2.4.6以降: ResultMetadata オブジェクトのリスト。(describe メソッドもこのリストを返します。)

各タプルと ResultMetadata オブジェクトには、列のメタデータ(列名、データ型など)が含まれています。 メタデータには、インデックスによりアクセス するか、2.4.6以降のバージョンでは ResultMetadata 属性によりアクセスできます。

次の例は、返されたタプルと ResultMetadata オブジェクトから、メタデータにアクセスする方法を示しています。

例: インデックスにより列名メタデータを取得:

次の例では、クエリの実行後に description 属性を使用して列名のリストを取得します。この例では、タプルの最初の値から列名にアクセスします。

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col[0] for col in cur.description]))

例: 属性により列名メタデータを取得(バージョン2.4.6以降):

次の例では、クエリの実行後に description 属性を使用して列名のリストを取得します。この例では、 ResultMetadata オブジェクトの name 属性から列名にアクセスします。

    cur = conn.cursor()
    result_metadata_list = cur.describe("SELECT * FROM test_table")
    print(','.join([col.name for col in result_metadata_list]))

例: クエリの実行することなく、列名のメタデータを取得(バージョン2.4.6以降):

次の例では、 describe メソッドを使用して、クエリを実行することなく列名のリストを取得します。

    cur = conn.cursor()
    result_metadata_list = cur.describe("SELECT * FROM test_table")
    print(','.join([col.name for col in result_metadata_list]))

エラーの処理

アプリケーションは、Snowflakeコネクタから発生した例外を適切に処理し、コードの実行を続行または停止する必要があります。

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

execute_stream の使用による SQL スクリプトの実行

execute_stream 関数を使用すると、ストリームで1つ以上の SQL スクリプトを実行できます。

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

接続の終了

ベストプラクティスとしては、 close メソッドを呼び出して接続を閉じます。

        connection.close()

これにより、収集されたクライアントメトリックがサーバーに送信され、セッションが削除されます。また、 try-finally ブロックは、途中で例外が発生した場合でも接続が確実に閉じられるようにします。

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()

コンテキストマネージャーを使用したトランザクションの接続および制御

Python用Snowflakeコネクタは、必要に応じてリソースを割り当てたり解放したりするコンテキストマネージャーをサポートしています。コンテキストマネージャーは、 autocommit が無効になっている場合、ステートメントのステータスに基づいてトランザクションをコミットまたはロールバックするのに役立ちます。

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail

上記の例では、3番目のステートメントが失敗すると、コンテキストマネージャーはトランザクションの変更をロールバックし、接続を閉じます。すべてのステートメントが成功した場合、コンテキストマネージャーは変更をコミットし、接続を閉じます。

try ブロックと except ブロックを持つ同等のコードは次のとおりです。

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()

ログ

Python用Snowflakeコネクタは、標準のPython logging モジュールを活用して定期的にステータスを記録するため、アプリケーションはバックグラウンドで動作しているアクティビティを追跡できます。ログを有効にする最も簡単な方法は、アプリケーションの開始時に logging.basicConfig() を呼び出すことです。

たとえば、ログレベルを INFO に設定し、ログを /tmp/snowflake_python_connector.log という名前のファイルに保存するには、

        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)

次のようにログレベルを DEBUG に設定すると、より包括的なログを有効にできます。

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

オプションですが推奨される SecretDetector フォーマッタークラスにより、Snowflake Python Connectorログファイルに書き込まれる前に、特定の既知の機密情報セットが確実にマスクされるようにします。SecretDetector を使用するには、次のようなコードを使用します。

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

注釈

botocore および boto3 は、Python用 AWS (Amazon Web Services) SDK から入手できます。

サンプルプログラム

次のサンプルコードは、前のセクションで説明した例の多くを有効なpythonプログラムに結合します。この例には2つの部分が含まれています。

  • 親クラス(「python_veritas_base」)には、サーバーへの接続など、多くの一般的な操作のコードが含まれています。

  • 子クラス(「python_connector_example」)は、テーブルのクエリなど、特定のクライアントのカスタム部分を表します。

このサンプルコードは、テストの1つから直接インポートされ、製品の最近のビルドで実行されたことを確認するのに役立ちます。

これはテストから取得されているため、一部のテストで使用される代替ポートとプロトコルを設定するための少量のコードが含まれています。ユーザーは、プロトコルまたはポート番号を設定 しない でください。代わりに、これらを省略してデフォルトを使用します。

これには、ドキュメントに個別にインポートできるコードを識別するためのセクションマーカー(別称「スニペットタグ」)も含まれています。通常、セクションマーカーは次のようになります。

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------

これらのセクションマーカーは、ユーザーコードでは必要ありません。

コードサンプルの最初の部分には、次の一般的なサブルーチンが含まれています。

  • 接続情報を含むコマンドライン引数(例:「--warehouse MyWarehouse」)を読み取ります。

  • サーバーに接続します。

  • ウェアハウス、データベース、およびスキーマを作成して使用します。

  • スキーマ、データベース、ウェアハウスを使い終えたらドロップします。


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This gets account identifier and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account identifier and login information from environment variables and command-line parameters.
        # For information about account identifiers, see
        # https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account_identifier> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
            except:
                PORT = ""
            try:
                PROTOCOL = connection_parameters["protocol"]
            except:
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        # If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


コードサンプルの2番目の部分では、テーブルの作成、行の挿入などを行います。


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

このサンプルを実行するには、次の手順を実行します。

  1. 最初のコードを「python_veritas_base.py」という名前のファイルにコピーします。

  2. 2番目のコードを「python_connector_example.py」という名前のファイルにコピーします

  3. 次のように、 SNOWSQL_PWD 環境変数をパスワードに設定します。

    export SNOWSQL_PWD='MyPassword'
    
  4. 次のようなコマンドラインを使用してプログラムを実行します(もちろん、ユーザーとアカウントの情報を自分のユーザーとアカウントの情報に置き換えます)。

    警告

    これにより、プログラムの最後にウェアハウス、データベース、およびスキーマが削除されます。既存のデータベースの名前は 使用しないでください

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
    

出力は次のとおりです。

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...

長い例を次に示します。

注釈

アカウントとログイン情報を設定するセクションで、Snowflakeログイン情報(名前、パスワードなど)に一致するように、必要に応じて変数を置き換えます。

この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'

import os

# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # user error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()