Pythonコネクタの使用

このトピックでは、Snowflakeコネクタを使用して、ユーザーログイン、データベースとテーブルの作成、ウェアハウスの作成、データの挿入/読み込み、クエリなどの標準のSnowflake操作を実行する方法を示す一連の例を提供します。

このトピックの最後にあるサンプルコードは、例を組み合わせて単一の動作するPythonプログラムにします。

このトピックの内容:

SnowCD を使用したSnowflakeへのネットワーク接続の検証

ドライバーを設定したら、 SnowCD を使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

初期設定プロセス中にオンデマンドで SnowCD をいつでも使用して、Snowflakeへのネットワーク接続を評価およびトラブルシューティングできます。

Snowflakeへの接続

snowflake.connector モジュールをインポートします。

import snowflake.connector

環境変数、コマンドライン、構成ファイル、または別の適切なソースからログイン情報を読み取ります。例:

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...

ACCOUNT パラメーターには、アカウントのある地域とクラウドプラットフォームが次の形式で必要になる場合があります。

'<your_account_name>.<region_id>.<cloud>' (e.g. 'xy12345.east-us-2.azure').

注釈

使用可能なコネクタパラメーターの説明については、 snowflake.connector メソッド をご参照ください。

独自のAmazon S3バケットからデータをコピーする場合、 AWS_ACCESS_KEY_ID および AWS_SECRET_ACCESS_KEYが必要です。

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

注釈

データがMicrosoft Azureコンテナーに保存されている場合は、 COPY ステートメントで認証情報を直接提供します。

接続情報を読み取った後、デフォルトの認証方式またはフェデレーション認証(有効な場合)を使用して接続します。

セッションパラメーターの設定

Pythonコネクターを使用する場合、 QUERY_TAGなどのセッションパラメーターを設定する方法は複数あります。

Snowflakeに接続するときに、セッションレベルのパラメーターを設定できます。これを行うには、次に示すように、「session_parameters」という名前のオプションの接続パラメーターを渡します。

con = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'EndOfMonthFinancials',
    }
)

connect()メソッドに渡されるsession_parameters辞書には、1つ以上のセッションレベルのパラメーターを含めることができます。

接続後に SQL ステートメント ALTER SESSION SET ... を実行して、セッションパラメータを設定することもできます。

con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")

セッションパラメータの詳細については、一般的な パラメーター ページの個々のパラメーターの説明をご参照ください。

デフォルト認証方式を使用した接続

ログインパラメーターを使用してSnowflakeに接続します。

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )

これを他の情報で拡張する必要がある場合があります。

認証でのシングルサインオン(SSO)の使用

シングルサインオン(SSO)を使用するようにSnowflakeを構成 している場合、認証に SSO を使用するようにクライアントアプリケーションを構成できます。詳細については Snowflakeに接続するクライアントアプリケーションでの SSO の使用 をご参照ください。

キーペア認証の使用

Snowflakeは一般的なユーザー名/パスワード認証ではなく、キーペア認証の使用をサポートしています。この認証方法には、2048ビット(最小)の RSA キーペアが必要です。 OpenSSLを使用して PEM (Privacy Enhanced Mail)公開/秘密キーペアを生成します。公開キーは、Snowflakeクライアントを使用するSnowflakeユーザーに割り当てられます。

公開/秘密キーペアを構成するには:

  1. ターミナルウィンドウのコマンドラインから、秘密キーを生成します。

    秘密キーは、暗号化バージョンまたは非暗号化バージョンのいずれかを生成できます。

    非暗号化バージョンを生成するには、次のコマンドを使用します。

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
    

    暗号化バージョンを生成するには、次の(「-nocrypt」を省略する)コマンドを使用します。

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    通常は、暗号化されたバージョンを生成する方が安全です。

    2番目のコマンドを使用して秘密キーを暗号化する場合、 OpenSSL は秘密キーファイルの暗号化に使用されるパスフレーズの入力を求めます。強力なパスフレーズを使用して秘密キーを保護することをお勧めします。このパスフレーズを安全な場所に記録します。Snowflakeに接続するときに入力します。パスフレーズは秘密キーの保護にのみ使用され、Snowflakeには送信されないことに注意してください。

    サンプル PEM 秘密キー

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. コマンドラインから、秘密キーを参照して公開キーを生成します:

    秘密キーが暗号化され、「rsa_key.p8」という名前のファイルに含まれていると仮定して、次のコマンドを使用します:

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    サンプル PEM 公開キー

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. 公開キーファイルと秘密キーファイルを保存用のローカルディレクトリにコピーします。ファイルへのパスを記録します。秘密キーは PKCS#8(公開キー暗号化標準)形式を使用して格納され、前の手順で指定したパスフレーズを使用して暗号化されることに注意してください。ただし、オペレーティングシステムが提供するファイル許可メカニズムを使用して、ファイルを不正アクセスから保護する必要があります。ファイルが使用されていない場合、ファイルを保護するのはユーザーの責任です。

  4. ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当てます。例:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    注釈

    • ユーザーを変更できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールのユーザー)以上のみです。

    • SQL ステートメントで公開キーのヘッダーとフッターを除外します。

    DESCRIBE USER を使用してユーザーの公開キーの指紋を検証します:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    注釈

    RSA_PUBLIC_KEY_2_FP プロパティについては、 キーローテーション (このトピック)で説明しています。

  5. 以下のサンプルコードを変更して実行します。コードは秘密キーファイルを復号化し、Snowflakeドライバーに渡して接続を作成します。

  • セキュリティパラメーターを更新します。

    • パス: 作成した秘密キーファイルへのローカルパスを指定します。

  • 接続パラメーターを更新します。

    • user: Snowflakeログイン名を指定します。

    • account: アカウントのフルネームを指定します(Snowflakeが提供)。アカウントがホストされているクラウドプラットフォーム(AWS またはAzure)と地域によっては、完全なアカウント名 に 追加 セグメントが必要な場合があります。詳細については、 account パラメーターの使用上の注意( connect メソッドの場合) をご参照ください。

    • region非推奨

      アカウントが存在する 地域 の ID を指定します。このパラメーターは非推奨であり、下位互換性のためにのみ文書化されています。

      新しいコードでは、 account の一部として地域(必要な場合)を指定する必要があります。

サンプルコード

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()

キーローテーション

Snowflakeは、複数のアクティブキーをサポートして、連続したローテーションを可能にします。内部的に従う有効期限のスケジュールに基づいて、公開キーと秘密キーをローテーションして交換します。

現在、 ALTER USERRSA_PUBLIC_KEY および RSA_PUBLIC_KEY_2 パラメーターを使用して、最大2個の公開キーを1人のユーザーに関連付けることができます。

キーをローテーションするには:

  1. キーペア認証の使用 の手順を完了して:

    • 新しい秘密キーと公開キーのセットを生成します。

    • ユーザーに公開キーを割り当てます。公開キーの値を RSA_PUBLIC_KEY または RSA_PUBLIC_KEY_2 (現在使用されていないキーの値)に設定します。例:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Snowflakeに接続するようにコードを更新します。新しい秘密キーを指定します。

    Snowflakeは、接続情報とともに送信された秘密キーに基づいて、認証用の正しいアクティブな公開キーを検証します。

  3. ユーザープロファイルから古い公開キーを削除します。例:

    alter user jsmith unset rsa_public_key;
    

プロキシサーバーの使用

プロキシサーバーを使用するには、次の環境変数を構成します。

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

注釈

プロキシパラメーター(つまり、 proxy_hostproxy_portproxy_user、および proxy_password)は非推奨です。代わりに環境変数を使用してください。

例:

Linuxまたは macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80

ちなみに

Snowflakeのセキュリティモデルは、Secure Sockets Layer(SSL)プロキシを許可しません( HTTPS 証明書を使用)。プロキシサーバーは、公的に利用可能な認証機関(CA)を使用する必要があり、侵害されたプロキシを介した MITM (Man In The Middle)攻撃などの潜在的なセキュリティリスクを低減します。

SSL プロキシを使用する 必要 がある場合は、通信中に証明書が変更されないように、サーバーポリシーを更新してSnowflake証明書を渡すことを強くお勧めします。

オプションで NO_PROXY を使用して、特定の通信のプロキシをバイパスできます。たとえば、Amazon S3へのアクセスは、 NO_PROXY=".amazonaws.com" を指定してプロキシサーバーをバイパスできます。

NO_PROXY はワイルドカードをサポートしていません。指定する各値は、次のいずれかである必要があります。

  • ホスト名の末尾(または完全なホスト名)。例:

    • .amazonaws.com

    • xy12345.snowflakecomputing.com

  • IP アドレス。例:

    • 192.196.1.15

複数の値を指定する場合、値はコンマで区切る必要があります。次に例を示します。

localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16

OAuth との接続

OAuthを使用して接続するには、接続文字列に oauth に設定された authenticator パラメーターと oauth_access_token に設定された token パラメーターを含める必要があります。詳細については、 OAuth のクライアント、ドライバー、およびコネクター をご参照ください。

ctx = snowflake.connector.connect(
    user="<username>",
    host="<hostname>",
    account="<account_name>",
    authenticator="oauth",
    token="<oauth_access_token>",
    warehouse="test_warehouse",
    database="test_db",
    schema="test_schema"
)

OCSP

ドライバーが接続すると、Snowflakeは証明書を送信して、Snowflakeを偽装しているホストではなく、Snowflakeへの接続であることを確認します。ドライバーはその証明書を OCSP (オンライン証明書状態プロトコル)サーバーに送信し、証明書が失効していないことを確認します。

ドライバーが証明書を確認するために OCSP サーバーに到達できない場合、ドライバーは "fail open" or "fail closed" できます。

フェールオープンまたはフェールクローズモードの選択

1.8.0より前のバージョンのPython用Snowflakeコネクタは、デフォルトでフェールクローズモードに設定されています。バージョン1.8.0以降はデフォルトでフェールオープンになります。connect()メソッドを呼び出すときにオプションの接続パラメーター ocsp_fail_open を設定することにより、デフォルトの動作を上書きできます。例:

con = snowflake.connector.connect(
    account=<account>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);

OCSP コネクタまたはドライバーバージョンの確認

ドライバーまたはコネクターのバージョンとその構成により、 OCSP の動作が決まります。ドライバーまたはコネクタのバージョン、それらの構成、および OCSP の動作の詳細については、 OCSP 設定 をご参照ください。

OCSP 応答のキャッシュ

すべての通信が安全であることを確認するために、Python用Snowflakeコネクタは HTTPS プロトコルを使用してSnowflakeおよび他のすべてのサービスに接続します(例:データファイルのステージング用のAmazon S3およびフェデレーション認証用のOkta)。通常の HTTPS プロトコルに加えて、コネクタは OCSP(オンライン証明書状態プロトコル)を介して各接続の TLS/SSL 証明書失効ステータスもチェックし、証明書が失効した場合や、OCSP ステータスが信頼できない場合は、接続を中止します。

各Snowflake接続は OCSP サーバーとの最大3つのラウンドトリップをトリガーするため、 OCSP 応答の複数レベルのキャッシュを導入して、接続に追加されるネットワークオーバーヘッドを削減しました。

  • プロセスの存続期間中保持されるメモリキャッシュ。

  • ファイルキャッシュ。キャッシュディレクトリ(例: ~/.cache/snowflake)がパージされるまで保持されます。

  • OCSP 応答サーバーキャッシュ。

キャッシュは、 OCSP サーバーの可用性の問題にも対処します(つまり、実際の OCSP サーバーがダウンした場合)。キャッシュが有効である限り、コネクタは証明書失効ステータスを検証できます。

キャッシュレイヤーに OCSP 応答が含まれていない場合、クライアントは CAの OCSP サーバーから検証ステータスを直接フェッチしようとします。

OCSP 応答ファイルキャッシュの場所の変更

デフォルトでは、ファイルキャッシュは次の場所で有効になっているため、追加の構成タスクは必要ありません。

Linux

~/.cache/snowflake/ocsp_response_cache.json

macOS

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

ただし、 OCSP 応答キャッシュファイルに別の場所やファイル名を指定する場合、 connect メソッドは URI 形式の OCSP キャッシュファイルのパスと名前を指定する、 ocsp_response_cache_filename パラメーターを受け入れます。

OCSP 応答キャッシュサーバー

注釈

OCSP 応答キャッシュサーバーは現在、Python用Snowflakeコネクタ1.6.0以降でサポートされています。

OCSP キャッシュのメモリとファイルの種類は、Snowflakeが提供するクライアントの1つを永続的なホストで使用して、Snowflakeに接続されたアプリケーションで良好に機能します。ただし、 AWS LambdaやDockerなどの動的にプロビジョニングされた環境では機能しません。

この状況に対処するために、Snowflakeは3番目のキャッシュレベルである OCSP 応答キャッシュサーバーを提供します。OCSP 応答キャッシュサーバーは、 CAの OCSP サーバーから1時間ごとに OCSP 応答をフェッチし、24時間保存します。クライアントは、このサーバーキャッシュから特定のSnowflake証明書の検証ステータスを要求できます。

重要

サーバーポリシーでほとんどまたはすべての外部 IP アドレスおよびウェブサイトへのアクセスが拒否された場合、通常のサービス操作を許可するには、キャッシュサーバーアドレスを 必ず ホワイトリストに登録する必要があります。キャッシュサーバー URL は ocsp*.snowflakecomputing.com:80 です。

何らかの理由でキャッシュサーバーを無効化する必要がある場合は、 SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED 環境変数を false に設定します。値は大文字と小文字が区別され、小文字にする必要があることに注意してください。

データベース、スキーマ、ウェアハウスの作成

ログイン後、 CREATE DATABASECREATE SCHEMA、および CREATE WAREHOUSE コマンドを使用して、データベース、スキーマ、およびウェアハウスを作成します(まだ存在しない場合)。

以下の例は、 tiny_warehouse という名前のウェアハウス、 testdb という名前のデータベース、および testschema という名前のスキーマを作成する方法を示しています。スキーマを作成するときは、スキーマを作成するデータベースの名前を指定するか、スキーマを作成するデータベースに既に接続している必要があります。次の例では、 CREATE SCHEMA コマンドの前に USE DATABASE コマンドを実行して、スキーマが正しいデータベースに作成されるようにします。

        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

データベース、スキーマ、ウェアハウスの使用

テーブルを作成するデータベースとスキーマを指定します。また、 DML ステートメントおよびクエリを実行するためのリソースを提供するウェアハウスを指定します。

たとえば、データベース testdb、スキーマ testschema およびウェアハウス tiny_warehouse (以前に作成した)を使用するには、

        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")

テーブルの作成およびデータの挿入

CREATE TABLE コマンドを使用してテーブルを作成し、 INSERT コマンドを使用してテーブルにデータを入力します。

たとえば、 testtable という名前のテーブルを作成し、テーブルに2つの行を挿入します。

    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

データのロード

個々の INSERT コマンドを使用してテーブルにデータを挿入する代わりに、内部または外部の場所にステージングされたファイルからデータを一括ロードできます。

内部の場所からのデータのコピー

ホストマシン上のファイルからテーブルにデータを読み込むには、最初に PUT コマンドを使用してファイルを内部の場所にステージングし、次に COPY INTO <テーブル> コマンドを使用してファイル内のデータをテーブルにコピーします。

例:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

Linuxまたは macOS 環境の /tmp/data という名前のローカルディレクトリに CSV データが保存され、ディレクトリには file0file1、... file100 という名前のファイルが含まれます。

外部の場所からのデータのコピー

外部の場所(つまり、独自のS3バケット)に既にステージングされているファイルからテーブルにデータをロードするには、 COPY INTO <テーブル> コマンドを使用します。

例:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

条件:

  • s3://<your_s3_bucket>/data/ はS3バケットの名前を指定します

  • バケット内のファイルの先頭には data が付きます。

  • バケットには、アカウント管理者(つまり、ACCOUNTADMIN のロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールによって CREATE STORAGE INTEGRATION を使用して作成されたストレージ統合を使用してアクセスされます。ストレージ統合により、ユーザーはプライベートストレージの場所にアクセスするための認証情報を提供する必要がなくなります。

注釈

この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。

データのクエリ

cursor の使用による値のフェッチ

カーソルオブジェクトの反復子メソッドを使用して、テーブルから値をフェッチします。

たとえば、以前に( テーブルの作成およびデータの挿入 で)作成された testtable という名前のテーブルから「col1」および「col2」という名前の列をフェッチするには、次のようなコードを使用します。

    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()

または、Python用Snowflakeコネクタが便利なショートカットを提供します。

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

単一の結果(つまり単一の行)を取得する必要がある場合は、 fetchone メソッドを使用します。

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

一度に指定された行数を取得する必要がある場合は、行数で fetchmany メソッドを使用します。

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

注釈

結果セットが大きすぎてメモリに収まらない場合は、 fetchone または fetchmany を使用します。

すべての結果を一度に取得する必要がある場合、

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

クエリのタイムアウトを設定するには、「開始」コマンドを実行し、クエリにタイムアウトパラメーターを含めます。クエリがパラメーター値の長さを超える場合、エラーが生成され、ロールバックが発生します。

次のコードでは、エラー604はクエリがキャンセルされたことを意味します。タイムアウトパラメーターは Timer() を開始し、指定された時間内にクエリが終了しない場合はキャンセルします。

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

DictCursor の使用による列名の値のフェッチ

列名で値をフェッチする場合は、タイプ DictCursorcursor オブジェクトを作成します。

例:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

クエリ ID ごとのクエリのキャンセル

クエリ ID によるクエリのキャンセル:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()

文字列「queryID」を実際のクエリ IDに置き換えます。

データ変換をバイパスすることによるクエリパフォーマンスの改善

クエリのパフォーマンスを向上させるには、 snowflake.connector.converter_null モジュールの SnowflakeNoConverterToPython クラスを使用して、Snowflake内部データ型からネイティブPythonデータ型へのデータ変換をバイパスします。例:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data

その結果、すべてのデータは文字列形式で表されるため、アプリケーションはそれをネイティブPythonデータ型に変換する必要があります。たとえば、 TIMESTAMP_NTZ および TIMESTAMP_LTZ データは文字列形式で表されるエポック時間であり、 TIMESTAMP_TZ データは文字列形式で表されるエポック時間の後に、空白と分単位の UTC へのオフセットが続きます。

バインドデータに影響はありません。Pythonネイティブデータは引き続き更新用にバインドできます。

データのバインド

SQL ステートメントで使用する値を指定するには、ステートメントにリテラルを含めるか、変数をバインドします。変数をバインドするときは、 SQL ステートメントのテキストに1つ以上のプレースホルダーを配置し、各プレースホルダーに変数(使用する値)を指定します。

次の例は、リテラルとバインドの使用を比較しています。

リテラル:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")

バインド:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))

注釈

バインドできるデータのサイズ、またはバッチで結合できるデータのサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。

Snowflakeは、次のタイプのバインドをサポートしています。

  • pyformat。

  • format。

  • qmark。

  • numeric。

これらのそれぞれについて、以下で説明します。

pyformat または format バインド

pyformat バインドと format バインドの両方が、サーバー側ではなくクライアント側でデータをバインドします。

デフォルトでは、Python用Snowflakeコネクタは pyformat および format の両方をサポートしているため、プレースホルダーとして %(name)s または %s を使用できます。例:

  • %(name)s をプレースホルダーとして使用:

        conn.cursor().execute(
            "INSERT INTO test_table(col1, col2) "
            "VALUES(%(col1)s, %(col2)s)", {
                'col1': 789,
                'col2': 'test string3',
                })
    
  • %s をプレースホルダーとして使用:

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

リストオブジェクトを使用して、 IN 演算子のデータをバインドすることもできます。

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

パーセント文字(「%」)は、 SQL LIKE のワイルドカード文字であり、Pythonの形式バインディング文字でもあります。フォーマットバインディングを使用し、 SQL コマンドにパーセント文字が含まれている場合、パーセント文字をエスケープする必要がある場合があります。例えば、 SQL ステートメントが次の場合:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.

Pythonコードは次のようになります(元のパーセント記号をエスケープするための余分なパーセント記号に注意してください)。

        sql_command = "select col1, col2 from test_table "
        sql_command += " where col2 like '%%York' limit %(lim)s"
        parameter_dictionary = {'lim': 1 }
        cur.execute(sql_command, parameter_dictionary)

qmark または numeric バインド

qmark バインドと numeric バインドの両方が、クライアント側ではなくサーバー側でデータをバインドします。

qmarkまたはnumericスタイルのバインドを使用するには、次を実行してモードを「qmark」に設定します。

snowflake.connector.paramstyle='qmark'

重要

connect() メソッドを呼び出す 前に 、paramstyleを設定する必要があります。

接続パラメーターで paramstyleqmark または numeric として指定されている場合、バインド変数はそれぞれ ? または :N である必要があり、サーバー側でバインドが発生します。

例:

  • ? をプレースホルダーとして使用:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • :N をプレースホルダーとして使用:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
  • qmarkバインドを使用して datetimeTIMESTAMP にバインド:

    qmark または numeric バインドを使用してデータをSnowflake TIMESTAMP データ型にバインドする場合、Snowflakeタイムスタンプデータ型、つまり TIMESTAMP_LTZ または TIMESTAMP_TZ をタプルの形式で指定します。

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "CREATE OR REPLACE TABLE testtable2 ("
        "   col1 int, "
        "   col2 string, "
        "   col3 timestamp_ltz"
        ")"
    )
    
    con.cursor().execute(
        "INSERT INTO testtable2(col1,col2,col3) "
        "VALUES(?,?,?)", (
            987,
            'test string4',
            ("TIMESTAMP_LTZ", datetime.now())
        )
    )
    

    クライアント側のバインドとは異なり、サーバー側のバインドでは、列にSnowflakeデータ型が必要です。ほとんどの一般的なPythonデータ型には、Snowflakeデータ型への暗黙的なマッピングが既にあります(例: intFIXED にマッピングされます)。ただし、Python datetime データは複数のSnowflakeデータ型(例: TIMESTAMP_NTZTIMESTAMP_LTZ または TIMESTAMP_TZ)にバインドでき、デフォルトのマッピングは TIMESTAMP_NTZ であるため、Python日時データのバインドには、ユーザーが特定のデータ型(例: TIMESTAMP_LTZ)を指定します。したがって、上記の例に示すようにデータ型を指定する必要があります。

SQL インジェクション攻撃の回避

SQL インジェクションの危険があるため、Pythonのフォーマット関数を使用してデータをバインドしないでください。例:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

代わりに、変数に値を保存し、それらの値をチェックして(例:文字列内の疑わしいセミコロンを探して)、qmarkまたはnumericバインドスタイルを使用してパラメーターをバインドします。

列のメタデータの取得

列のメタデータは、 description 属性の Cursor オブジェクトに保存されます。

次の簡単な例では、列名のリストを取得します。

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col[0] for col in cur.description]))

Snowflakeクエリ IDsの取得

クエリ ID は、Snowflakeによって実行される各クエリに割り当てられます。Snowflakeウェブインターフェイスでは、クエリ IDs が History History tab ページに表示され、クエリのステータスを確認するときに表示されます。

Python用Snowflakeコネクタは、 Cursor オブジェクトに特別な属性 sfqid を提供します。これにより、ウェブインターフェイスでステータスに関連付けることができます。Snowflakeクエリ ID を取得するには、最初にクエリを実行してから、 sfqid 属性を使用して取得します。

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

エラーの処理

アプリケーションは、Snowflakeコネクタから発生した例外を適切に処理し、コードの実行を続行または停止する必要があります。

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

execute_stream の使用による SQL スクリプトの実行

execute_stream 関数を使用すると、ストリームで1つ以上の SQL スクリプトを実行できます。

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

接続の終了

ベストプラクティスとしては、 close メソッドを呼び出して接続を閉じます。

        connection.close()

これにより、収集されたクライアントメトリックがサーバーに送信され、セッションが削除されます。また、 try-finally ブロックは、途中で例外が発生した場合でも接続が確実に閉じられるようにします。

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()

コンテキストマネージャーを使用したトランザクションの接続および制御

Python用Snowflakeコネクタは、必要に応じてリソースを割り当てたり解放したりするコンテキストマネージャーをサポートしています。コンテキストマネージャーは、 autocommit が無効になっている場合、ステートメントのステータスに基づいてトランザクションをコミットまたはロールバックするのに役立ちます。

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail

上記の例では、3番目のステートメントが失敗すると、コンテキストマネージャーはトランザクションの変更をロールバックし、接続を閉じます。すべてのステートメントが成功した場合、コンテキストマネージャーは変更をコミットし、接続を閉じます。

try ブロックと except ブロックを持つ同等のコードは次のとおりです。

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()

ログ

Python用Snowflakeコネクタは、標準のPython logging モジュールを活用して定期的にステータスを記録するため、アプリケーションはバックグラウンドで動作しているアクティビティを追跡できます。ログを有効にする最も簡単な方法は、アプリケーションの開始時に logging.basicConfig() を呼び出すことです。

たとえば、ログレベルを INFO に設定し、ログを /tmp/snowflake_python_connector.log という名前のファイルに保存するには、

        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)

次のようにログレベルを DEBUG に設定すると、より包括的なログを有効にできます。

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

オプションですが推奨される SecretDetector フォーマッタークラスにより、Snowflake Python Connectorログファイルに書き込まれる前に、特定の既知の機密情報セットが確実にマスクされるようにします。SecretDetector を使用するには、次のようなコードを使用します。

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

注釈

botocore および boto3 は、Python用 AWS (Amazon Web Services) SDK から入手できます。

サンプルプログラム

次のサンプルコードは、前のセクションで説明した例の多くを有効なpythonプログラムに結合します。この例には2つの部分が含まれています。

  • 親クラス(「python_veritas_base」)には、サーバーへの接続など、多くの一般的な操作のコードが含まれています。

  • 子クラス(「python_connector_example」)は、テーブルのクエリなど、特定のクライアントのカスタム部分を表します。

このサンプルコードは、テストの1つから直接インポートされ、製品の最近のビルドで実行されたことを確認するのに役立ちます。

これはテストから取得されるため、一部のテストで使用される代替ポートとプロトコルを設定するための少量のコードが含まれています。お客様は、プロトコルまたはポート番号を 設定しない でください。代わりに、これらを省略してデフォルトを使用します。

これには、ドキュメントに個別にインポートできるコードを識別するためのセクションマーカー(「スニペットタグ」とも呼ばれます)も含まれています。通常、セクションマーカーは次のようになります。

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------

これらのセクションマーカーは、顧客のクライアントコードでは必要ありません。

コードサンプルの最初の部分には、次の一般的なサブルーチンが含まれています。

  • 接続情報を含むコマンドライン引数(例:「--warehouse MyWarehouse」)を読み取ります。

  • サーバーに接続します。

  • ウェアハウス、データベース、およびスキーマを作成して使用します。

  • スキーマ、データベース、ウェアハウスを使い終えたらドロップします。


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This connects gets account and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account and login information from environment variables and
        # command-line parameters.
        # Note that ACCOUNT might require the region and cloud platform where
        # your account is located, in the form of
        #     '<your_account_name>.<region>.<cloud>'
        # for example
        #     'xy12345.us-east-1.azure')
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
                PROTOCOL = connection_parameters["protocol"]
            except:
                PORT = ""
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


コードサンプルの2番目の部分では、テーブルの作成、行の挿入などを行います。


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

このサンプルを実行するには、次の手順を実行します。

  1. 最初のコードを「python_veritas_base.py」という名前のファイルにコピーします。

  2. 2番目のコードを「python_connector_example.py」という名前のファイルにコピーします

  3. 次のように、 SNOWSQL_PWD 環境変数をパスワードに設定します。

    export SNOWSQL_PWD='MyPassword'
    
  4. 次のようなコマンドラインを使用してプログラムを実行します(もちろん、ユーザーとアカウントの情報を自分のユーザーとアカウントの情報に置き換えます)。

    警告

    これにより、プログラムの最後にウェアハウス、データベース、およびスキーマが削除されます。既存のデータベースの名前は 使用しないでください

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account xy98764 --user MyUserName
    

出力は次のとおりです。

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...

長い例を次に示します。

注釈

アカウントとログイン情報を設定するセクションで、Snowflakeログイン情報(名前、パスワードなど)に一致するように、必要に応じて変数を置き換えます。

この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values). Note that ACCOUNT might also require the
# region and cloud platform where your account is located, in the form of
# '<your_account_name>.<region_id>.<cloud_platform>' (e.g. 'xy12345.east-us-2.azure')
ACCOUNT = '<your_account_name>'
USER = '<your_login_name>'
PASSWORD = '<your_password>'

import os

# Only required if you copy data from your own S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <your_s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()