Pythonコネクタの使用

このトピックでは、Snowflakeコネクタを使用して、ユーザーログイン、データベースとテーブルの作成、ウェアハウスの作成、データの挿入/読み込み、クエリなどの標準のSnowflake操作を実行する方法を示す一連の例を提供します。

このトピックの最後にあるサンプルコードは、例を組み合わせて単一の動作するPythonプログラムにします。

このトピックの内容:

データベース、スキーマ、ウェアハウスの作成

ログイン後、 CREATE DATABASECREATE SCHEMA、および CREATE WAREHOUSE コマンドを使用して、データベース、スキーマ、およびウェアハウスを作成します(まだ存在しない場合)。

以下の例は、 tiny_warehouse という名前のウェアハウス、 testdb という名前のデータベース、および testschema という名前のスキーマを作成する方法を示しています。スキーマを作成するときは、スキーマを作成するデータベースの名前を指定するか、スキーマを作成するデータベースに既に接続している必要があります。次の例では、 CREATE SCHEMA コマンドの前に USE DATABASE コマンドを実行して、スキーマが正しいデータベースに作成されるようにします。

conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Copy

データベース、スキーマ、ウェアハウスの使用

テーブルを作成するデータベースとスキーマを指定します。また、 DML ステートメントおよびクエリを実行するためのリソースを提供するウェアハウスを指定します。

たとえば、データベース testdb、スキーマ testschema およびウェアハウス tiny_warehouse (以前に作成した)を使用するには、

conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Copy

テーブルの作成およびデータの挿入

CREATE TABLE コマンドを使用してテーブルを作成し、 INSERT コマンドを使用してテーブルにデータを入力します。

たとえば、 testtable という名前のテーブルを作成し、テーブルに2つの行を挿入します。

conn.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "test_table(col1 integer, col2 string)")

conn.cursor().execute(
    "INSERT INTO test_table(col1, col2) VALUES " + 
    "    (123, 'test string1'), " + 
    "    (456, 'test string2')")
Copy

データのロード

個々の INSERT コマンドを使用してテーブルにデータを挿入する代わりに、内部または外部の場所にステージングされたファイルからデータを一括ロードできます。

内部の場所からのデータのコピー

ホストマシン上のファイルからテーブルにデータを読み込むには、最初に PUT コマンドを使用してファイルを内部の場所にステージングし、次に COPY INTO <テーブル> コマンドを使用してファイル内のデータをテーブルにコピーします。

例:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
Copy

Linuxまたは macOS 環境の /tmp/data という名前のローカルディレクトリに CSV データが保存され、ディレクトリには file0file1、... file100 という名前のファイルが含まれます。

外部の場所からのデータのコピー

外部の場所(つまり、S3バケット)に既にステージングされているファイルからテーブルにデータをロードするには、 COPY INTO <テーブル> コマンドを使用します。

例:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
Copy

条件:

  • s3://<s3バケット>/data/ はS3バケットの名前を指定します

  • バケット内のファイルの先頭には data が付きます。

  • バケットには、アカウント管理者(つまり、ACCOUNTADMIN のロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールによって CREATE STORAGE INTEGRATION を使用して作成されたストレージ統合を使用してアクセスされます。ストレージ統合により、ユーザーはプライベートストレージの場所にアクセスするための認証情報を提供する必要がなくなります。

注釈

この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。

データのクエリ

Python用Snowflakeコネクタを使用すると、次の情報を送信できます。

  • 同期クエリ。クエリが完了した後にアプリケーションに制御を返します。

  • 非同期クエリ。クエリが完了する前にアプリケーションに制御を返します。

クエリが完了したら、 Cursor オブジェクトを使用して 結果の値をフェッチ します。デフォルトでは、Python用Snowflakeコネクタは、 値を Snowflakeデータ型 からPythonのネイティブデータ型に変換します。(値を文字列として返し、アプリケーションで型変換を実行するように選択することもできます。 データ変換をバイパスすることによるクエリパフォーマンスの改善 をご参照ください。)

注釈

デフォルトでは、 NUMBER 列の値は倍精度浮動小数点値(float64)として返されます。fetch_pandas_all() メソッドと fetch_pandas_batches() メソッドでこれらを10進値(decimal.Decimal)として返すには、 connect() メソッドの arrow_number_to_decimal パラメーターを True に設定します。

同期クエリの実行

同期クエリを実行するには、 Cursor オブジェクトで execute() メソッドを呼び出します。例:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Copy

cursor を使用した値のフェッチ で説明されているように、 Cursor オブジェクトを使用して結果の値をフェッチします。

非同期クエリの実行

Python用Snowflakeコネクタは、非同期クエリ(つまり、クエリが完了する前にユーザーに制御を返すクエリ)をサポートしています。非同期クエリを送信し、ポーリングを使用して、クエリがいつ完了したかを判断できます。クエリが完了すると、結果を取得できます。

注釈

非同期クエリを実行するには、 ABORT_DETACHED_QUERY 構成パラメーターが FALSE (デフォルト値)であることを確認する必要があります。

Snowflakeは、一定期間(デフォルト: 5分)後に接続を自動的に閉じます。これにより、アクティブなクエリが孤立します。値が TRUE の場合、Snowflakeはこれらの孤立したクエリを終了するため、非同期クエリに影響を与える可能性があります。

この機能を使用すると、各クエリが完了するのを待たずに、複数のクエリを並行して送信できます。同じセッション中に、同期クエリと非同期クエリを組み合わせて実行することもできます。

最後に、ある接続から非同期クエリを送信し、別の接続からの結果を確認できます。たとえば、ユーザーはアプリケーションから長時間実行されるクエリを開始し、アプリケーションを終了し、後ほどアプリケーションを再起動して結果を確認することができます。

非同期クエリの送信

非同期クエリを送信するには、 Cursor オブジェクトで execute_async() メソッドを呼び出します。例:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Copy

クエリを送信した後、

非同期クエリの実行例については、 非同期クエリの例 をご参照ください。

非同期クエリのベストプラクティス

非同期クエリを送信するときは、次のベストプラクティスに従ってください。

  • クエリを並行して実行する前に、どのクエリが他のクエリに依存しているかを確認してください。たとえば、 INSERT ステートメントは、対応する CREATE TABLE ステートメントが終了するまで開始するべきではありません。

  • 使用可能なメモリに対して、実行するクエリが多すぎないことを確認してください。特に複数の結果が同時にメモリに格納されている場合に複数のクエリを並行して実行すると、通常はより多くのメモリを使用します。

  • ポーリング時に、クエリが成功しないまれなケースを処理します。

  • トランザクション制御ステートメント(BEGIN、 COMMIT、および ROLLBACK)が他のステートメントと並行して実行されないようにします。

Snowflakeクエリ ID の取得

クエリ ID は、Snowflakeによって実行される各クエリを識別します。Python用Snowflakeコネクタを使用してクエリを実行すると、 Cursor オブジェクトの sfqid 属性を介してクエリ ID にアクセスできます。

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
Copy

クエリ ID を使用して次を実行できます。

クエリのステータスの確認

クエリのステータスを確認するには、

  1. Cursor オブジェクトの sfqid フィールドから、クエリ ID を取得します。

  2. クエリ ID を Connection オブジェクトの get_query_status() メソッドに渡して、クエリのステータスを表す QueryStatus 列挙型定数を返します。

    デフォルトでは、クエリの結果がエラーになった場合、 get_query_status() はエラーを発生しません。エラーを発生したい場合は、代わりに get_query_status_throw_if_error() を呼び出してください。

  3. QueryStatus 列挙型定数を使用して、クエリのステータスを確認します。

    • クエリがまだ実行されているかどうかを判断するには(たとえば、これが非同期クエリの場合)、定数を Connection オブジェクトの is_still_running() メソッドに渡します。

    • エラーが発生したかどうかを判断するには、定数を is_an_error() メソッドに渡します。

    列挙型定数の包括的なリストについては、 QueryStatus をご参照ください。

次の例では、非同期クエリを実行し、クエリのステータスを確認します。

import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
  time.sleep(1)
Copy

次の例では、クエリの結果でエラーある場合にエラーが発生します。

from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
  query_id = cur.sfqid
  while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
    time.sleep(1)
except ProgrammingError as err:
  print('Programming Error: {0}'.format(err))
Copy

クエリ ID を使用したクエリの結果の取得

注釈

Cursor オブジェクトで execute() メソッドを呼び出して 同期クエリを実行 した場合は、結果を取得するためにクエリ ID を使用する必要はありません。 cursor を使用した値のフェッチ で説明されているように、結果から値をフェッチするだけです。

非同期クエリまたは以前に送信された同期クエリの結果を取得する場合は、次の手順に従います。

  1. 該当クエリのクエリ ID を取得します。 Snowflakeクエリ ID の取得 をご参照ください。

  2. Cursor オブジェクトで get_results_from_sfqid() メソッドを呼び出して、結果を取得します。

  3. cursor を使用した値のフェッチ で説明されているように、 Cursor オブジェクトを使用して結果の値をフェッチします。

クエリがまだ実行中の場合、フェッチメソッド( fetchone()fetchmany()fetchall() など)はクエリの完了を待機することに注意してください。

例:

# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

cursor を使用した値のフェッチ

カーソルオブジェクトの反復子メソッドを使用して、テーブルから値をフェッチします。

たとえば、以前に( テーブルの作成およびデータの挿入 で)作成された testtable という名前のテーブルから「col1」および「col2」という名前の列をフェッチするには、次のようなコードを使用します。

cur = conn.cursor()
try:
    cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()
Copy

または、Python用Snowflakeコネクタが便利なショートカットを提供します。

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))
Copy

単一の結果(つまり単一の行)を取得する必要がある場合は、 fetchone メソッドを使用します。

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))
Copy

一度に指定された行数を取得する必要がある場合は、行数で fetchmany メソッドを使用します。

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)
Copy

注釈

結果セットが大きすぎてメモリに収まらない場合は、 fetchone または fetchmany を使用します。

すべての結果を一度に取得する必要がある場合、

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))
Copy

クエリのタイムアウトを設定するには、「開始」コマンドを実行し、クエリにタイムアウトパラメーターを含めます。クエリがパラメーター値の長さを超える場合、エラーが生成され、ロールバックが発生します。

次のコードでは、エラー604はクエリがキャンセルされたことを意味します。タイムアウトパラメーターは Timer() を開始し、指定された時間内にクエリが終了しない場合はキャンセルします。

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")
Copy

DictCursor を使用した列名の値のフェッチ

列名で値をフェッチする場合は、タイプ DictCursorcursor オブジェクトを作成します。

例:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()
Copy

非同期クエリの例

以下は、非同期クエリの簡単な例です。

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

次の例では、ある接続から非同期クエリを送信し、別の接続から結果を取得します。

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Get the query ID for the asynchronous query.
query_id = cur.sfqid

# Close the cursor and the connection.
cur.close()
conn.close()

# Open a new connection.
new_conn = snowflake.connector.connect( ... )

# Create a new cursor.
new_cur = new_conn.cursor()

# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Copy

クエリ ID ごとのクエリのキャンセル

クエリ ID によるクエリのキャンセル:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()
Copy

文字列「queryID」を実際のクエリ ID に置き換えます。クエリ ID を取得するには、 Snowflakeクエリ ID の取得 をご参照ください。

データ変換をバイパスすることによるクエリパフォーマンスの改善

クエリのパフォーマンスを向上させるには、 snowflake.connector.converter_null モジュールの SnowflakeNoConverterToPython クラスを使用して、Snowflake内部データ型からネイティブPythonデータ型へのデータ変換をバイパスします。例:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data
Copy

その結果、すべてのデータは文字列形式で表されるため、アプリケーションはそれをネイティブPythonデータ型に変換する必要があります。たとえば、 TIMESTAMP_NTZ および TIMESTAMP_LTZ データは文字列形式で表されるエポック時間であり、 TIMESTAMP_TZ データは文字列形式で表されるエポック時間の後に、空白と分単位の UTC へのオフセットが続きます。

バインドデータに影響はありません。Pythonネイティブデータは引き続き更新用にバインドできます。

データのバインド

SQL ステートメントで使用する値を指定するには、ステートメントにリテラルを含めるか、 変数をバインドします。変数をバインドするときは、 SQL ステートメントのテキストに1つ以上のプレースホルダーを配置し、各プレースホルダーに変数(使用する値)を指定します。

次の例は、リテラルとバインドの使用を比較しています。

リテラル:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")
Copy

バインド:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))
Copy

注釈

バインドできるデータのサイズ、またはバッチで結合できるデータのサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。

Snowflakeは、次のタイプのバインドをサポートしています。

これらのそれぞれについて、以下で説明します。

pyformat または format バインド

pyformat バインディングと format バインディングの両方が、サーバー側ではなくクライアント側でデータをバインドします。

デフォルトでは、Python用Snowflakeコネクタは pyformat および format の両方をサポートしているため、プレースホルダーとして %(name)s または %s を使用できます。例:

  • %(name)s をプレースホルダーとして使用:

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) "
        "VALUES(%(col1)s, %(col2)s)", {
            'col1': 789,
            'col2': 'test string3',
            })
    
    Copy
  • %s をプレースホルダーとして使用。

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    
    Copy

pyformatformat により、リストオブジェクトを使用して IN 演算子のデータをバインドすることもできます。

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))
Copy

パーセント文字(「%」)は、 SQL LIKE のワイルドカード文字であり、Pythonの形式バインディング文字でもあります。フォーマットバインディングを使用し、 SQL コマンドにパーセント文字が含まれている場合、パーセント文字をエスケープする必要がある場合があります。例えば、 SQL ステートメントが次の場合:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.
Copy

Pythonコードは次のようになります(元のパーセント記号をエスケープするための追加のパーセント記号に注意)。

sql_command = "select col1, col2 from test_table "
sql_command += " where col2 like '%%York' limit %(lim)s"
parameter_dictionary = {'lim': 1 }
cur.execute(sql_command, parameter_dictionary)
Copy

qmark または numeric バインド

qmark バインディングと numeric バインディングの両方とも、クライアント側ではなくサーバー側でデータをバインドします。

  • qmark バインディングの場合は、疑問符文字(?)を使用して、変数の値を文字列のどこに挿入するかを指定します。

  • numeric バインディングの場合は、コロン(:)の後に数字を使用して、その位置で置換する変数の位置を指定します。たとえば、 :2 は2番目の変数を指定します。

    数値バインディングを使用して、同じクエリで同じ値を複数回バインドします。たとえば、長い VARCHAR、 BINARY、または 半構造化 の値を複数回使用する場合は、 numeric バインディングにより値をサーバーに1回送信すると、それを複数回使用できるようになります。

次のセクションでは、 qmark および numeric バインディングの使用方法について説明します。

qmark または numeric バインドの使用

qmark または numeric スタイルのバインディングを使用するには、次のいずれかを実行します。

  • snowflake.connector.paramstyle='qmark'

  • snowflake.connector.paramstyle='numeric'

重要

connect() メソッドを呼び出す 前にparamstyle 属性を設定する必要があります。

paramstyleqmark または numeric に設定する場合は、プレースホルダーとしてそれぞれ ? または :NN は数字に置換)を使用する必要があります。

例:

  • ? をプレースホルダーとして使用。

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
    Copy
  • :N をプレースホルダーとして使用。

    import snowflake.connector
    
    snowflake.connector.paramstyle='numeric'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
    Copy

    次のクエリは、 numeric バインドを使用して変数を再利用する方法を示しています。

    con.cursor().execute(
        "INSERT INTO testtable(complete_video, short_sample_of_video) "
        "VALUES(:1, SUBSTRING(:1, :2, :3))", (
            binary_value_that_stores_video,          # variable :1
            starting_offset_in_bytes_of_video_clip,  # variable :2
            length_in_bytes_of_video_clip            # variable :3
        ))
    
    Copy

datetime での qmark または numeric バインドの使用

qmark または numeric バインディングを使用してデータをSnowflake TIMESTAMP データ型にバインドする場合は、Snowflakeタイムスタンプデータ型(TIMESTAMP_LTZ または TIMESTAMP_TZ)と値を指定するタプルに、バインド変数を設定します。例:

import snowflake.connector

snowflake.connector.paramstyle='qmark'

con = snowflake.connector.connect(...)

con.cursor().execute(
    "CREATE OR REPLACE TABLE testtable2 ("
    "   col1 int, "
    "   col2 string, "
    "   col3 timestamp_ltz"
    ")"
)

con.cursor().execute(
    "INSERT INTO testtable2(col1,col2,col3) "
    "VALUES(?,?,?)", (
        987,
        'test string4',
        ("TIMESTAMP_LTZ", datetime.now())
    )
 )
Copy

クライアント側のバインディングとは異なり、サーバー側のバインディングでは、列にSnowflakeデータ型が必要です。ほとんどの一般的なPythonデータ型には、Snowflakeデータ型への暗黙的なマッピングがすでにあります(例: intFIXED にマッピングされます)。ただし、Python datetime データは、複数のSnowflakeデータ型(TIMESTAMP_NTZTIMESTAMP_LTZ、または TIMESTAMP_TZ)のいずれかにバインドでき、デフォルトのマッピングは TIMESTAMP_NTZ であるため、使用するSnowflakeデータ型を指定する必要があります。

IN 演算子でのバインド変数の使用

qmark および numeric (サーバー側バインディング)は、 IN 演算子を使用したバインド変数の使用をサポートしていません。

IN 演算子でバインド変数を使用する必要がある場合は、 クライアント側バインディングpyformat または format)を使用します。

バッチ挿入の変数に対するバインドパラメーター

アプリケーションコードで、単一のバッチに複数の行を挿入できます。これを実行するには、 INSERT ステートメントの値のパラメーターを使用します。たとえば、次のステートメントは、 INSERT ステートメントの qmark のバインドにプレースホルダーを使用します。

insert into grocery (item, quantity) values (?, ?)
Copy

次に、挿入する必要のあるデータを指定するには、複数のシーケンス(たとえば、タプルのリスト)のシーケンス1つである変数を定義します。

rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
Copy

上記の例に示されているように、リスト内の各項目は、挿入される行の列値を含むタプルです。

バインディングを実行するには、 executemany() メソッドを呼び出し、変数を2番目の引数として渡します。例:

conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
    "insert into grocery (item, quantity) values (?, ?)",
    rows_to_insert)
Copy

サーバー上でデータをバインド している場合(つまり、 qmark または numeric バインドを使用)、コネクタはバインドを通じてバッチ挿入のパフォーマンスを最適化できます。

この手法を使用して多数の値を挿入する場合は、インジェスチョンのためにデータを(ローカルマシン上でファイルを作成することなく)仮ステージにストリーミングすると、ドライバーのパフォーマンスを向上させることができます。値の数がしきい値を超えると、ドライバーはこれを自動的に実行します。

さらに、セッションの現在のデータベースとスキーマを設定する必要があります。これらが設定されていない場合、ドライバーによって実行される CREATE TEMPORARY STAGE コマンドは、次のエラーにより失敗する可能性があります。

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Copy

注釈

データをSnowflakeデータベースにロードする別の方法( COPY コマンドを使用した一括ロードを含む)については、 Snowflakeにデータをロードする をご参照ください。

SQL インジェクション攻撃を回避する

SQL インジェクションの危険があるため、Pythonのフォーマット関数を使用してデータをバインドしないでください。例:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )
Copy

代わりに、値を変数に格納してから、qmarkまたは数値バインドスタイルを使用してそれらの変数をバインドします。

列のメタデータの取得

結果セットの各列に関するメタデータ(例: 各列の名前、タイプ、精度、スケールなど)を取得するには、次のいずれかの方法を使用します。

  • execute() メソッドを呼び出してクエリを実行した後にメタデータにアクセスするには、 Cursor オブジェクトの description 属性を使用します。

  • クエリを実行 することなく メタデータにアクセスするには、 describe() メソッドを呼び出します。

    describe メソッドは、Python用Snowflakeコネクタ2.4.6以降のバージョンで使用できます。

description 属性は、次のいずれかの値に設定されます。

  • バージョン2.4.5以前: タプルのリスト。

  • バージョン2.4.6以降: ResultMetadata オブジェクトのリスト。(describe メソッドもこのリストを返します。)

各タプルと ResultMetadata オブジェクトには、列のメタデータ(列名、データ型など)が含まれています。 メタデータには、インデックスによりアクセス するか、2.4.6以降のバージョンでは ResultMetadata 属性によりアクセスできます。

次の例は、返されたタプルと ResultMetadata オブジェクトから、メタデータにアクセスする方法を示しています。

例: インデックスにより列名メタデータを取得(バージョン2.4.5以前):

次の例では、クエリの実行後に description 属性を使用して列名のリストを取得します。属性はタプルのリストであり、この例では、各タプルの最初の値から列名にアクセスします。

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col[0] for col in cur.description]))
Copy

例: 属性により列名メタデータを取得(バージョン2.4.6以降):

次の例では、クエリの実行後に description 属性を使用して列名のリストを取得します。属性は ResultMetaData オブジェクトのリストであり、この例では、各 ResultMetadata オブジェクトの name 属性から列名にアクセスします。

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col.name for col in cur.description]))
Copy

例: クエリの実行することなく、列名のメタデータを取得(バージョン2.4.6以降):

次の例では、 describe メソッドを使用して、クエリを実行することなく列名のリストを取得します。 describe() メソッドは ResultMetaData オブジェクトのリストを返し、この例では各 ResultMetadata オブジェクトの name 属性から列名にアクセスします。

cur = conn.cursor()
result_metadata_list = cur.describe("SELECT * FROM test_table")
print(','.join([col.name for col in result_metadata_list]))
Copy

エラーの処理

アプリケーションは、Snowflakeコネクタから発生した例外を適切に処理し、コードの実行を続行または停止する必要があります。

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()
Copy

execute_stream を使用した SQL スクリプトの実行

execute_stream 関数を使用すると、ストリームで1つ以上の SQL スクリプトを実行できます。

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)
Copy

接続の終了

ベストプラクティスとしては、 close メソッドを呼び出して接続を閉じます。

connection.close()
Copy

これにより、収集されたクライアントメトリックがサーバーに送信され、セッションが削除されます。また、 try-finally ブロックは、途中で例外が発生した場合でも接続が確実に閉じられるようにします。

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()
Copy

コンテキストマネージャーを使用したトランザクションの接続および制御

Python用Snowflakeコネクタは、必要に応じてリソースを割り当てたり解放したりするコンテキストマネージャーをサポートしています。コンテキストマネージャーは、 autocommit が無効になっている場合、ステートメントのステータスに基づいてトランザクションをコミットまたはロールバックするのに役立ちます。

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
Copy

上記の例では、3番目のステートメントが失敗すると、コンテキストマネージャーはトランザクションの変更をロールバックし、接続を閉じます。すべてのステートメントが成功した場合、コンテキストマネージャーは変更をコミットし、接続を閉じます。

try ブロックと except ブロックを持つ同等のコードは次のとおりです。

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()
Copy

ログ

Python用Snowflakeコネクタは、標準のPython logging モジュールを活用して定期的にステータスを記録するため、アプリケーションはバックグラウンドで動作しているアクティビティを追跡できます。ログを有効にする最も簡単な方法は、アプリケーションの開始時に logging.basicConfig() を呼び出すことです。

たとえば、ログレベルを INFO に設定し、ログを /tmp/snowflake_python_connector.log という名前のファイルに保存するには、

logging.basicConfig(
    filename=file_name,
    level=logging.INFO)
Copy

次のようにログレベルを DEBUG に設定すると、より包括的なログを有効にできます。

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

オプションですが推奨される SecretDetector フォーマッタークラスにより、Snowflake Python Connectorログファイルに書き込まれる前に、特定の既知の機密情報セットが確実にマスクされるようにします。SecretDetector を使用するには、次のようなコードを使用します。

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

注釈

botocore および boto3 は、Python用 AWS (Amazon Web Services) SDK から入手できます。

サンプルプログラム

次のサンプルコードは、前のセクションで説明した例の多くを有効なpythonプログラムに結合します。この例には2つの部分が含まれています。

  • 親クラス(「python_veritas_base」)には、サーバーへの接続など、多くの一般的な操作のコードが含まれています。

  • 子クラス(「python_connector_example」)は、テーブルのクエリなど、特定のクライアントのカスタム部分を表します。

このサンプルコードは、テストの1つから直接インポートされ、製品の最近のビルドで実行されたことを確認するのに役立ちます。

これはテストから取得されているため、一部のテストで使用される代替ポートとプロトコルを設定するための少量のコードが含まれています。ユーザーは、プロトコルまたはポート番号を設定 しない でください。代わりに、これらを省略してデフォルトを使用します。

これには、ドキュメントに個別にインポートできるコードを識別するためのセクションマーカー(別称「スニペットタグ」)も含まれています。セクションマーカーは通常、次と類似しています。

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
Copy

これらのセクションマーカーは、ユーザーコードでは必要ありません。

コードサンプルの最初の部分には、次の一般的なサブルーチンが含まれています。

  • 接続情報を含むコマンドライン引数(例:「--warehouse MyWarehouse」)を読み取ります。

  • サーバーに接続します。

  • ウェアハウス、データベース、およびスキーマを作成して使用します。

  • スキーマ、データベース、ウェアハウスを使い終えたらドロップします。


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This gets account identifier and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account identifier and login information from environment variables and command-line parameters.
        # For information about account identifiers, see
        # https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account_identifier> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
            except:
                PORT = ""
            try:
                PROTOCOL = connection_parameters["protocol"]
            except:
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        # If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


Copy

コードサンプルの2番目の部分では、テーブルを作成したり、それに行を挿入したりします。


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

Copy

このサンプルを実行するには、次を実行します。

  1. 最初のコードを「python_veritas_base.py」という名前のファイルにコピーします。

  2. 2番目のコードを「python_connector_example.py」という名前のファイルにコピーします

  3. 次のように、 SNOWSQL_PWD 環境変数をパスワードに設定します。

    export SNOWSQL_PWD='MyPassword'
    
    Copy
  4. 次のようなコマンドラインを使用してプログラムを実行します(もちろん、ユーザーとアカウントの情報を自分のユーザーとアカウントの情報に置き換えます)。

    警告

    これにより、プログラムの最後にウェアハウス、データベース、およびスキーマが削除されます。既存のデータベースの名前は 使用しないでください

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
    
    Copy

出力は次のとおりです。

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...
Copy

長い例を次に示します。

注釈

アカウントとログイン情報を設定するセクションで、Snowflakeログイン情報(名前、パスワードなど)に一致するように、必要に応じて変数を置き換えます。

この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'

import os

# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # user error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()
Copy