Pythonコネクタの使用¶
このトピックでは、Snowflakeコネクタを使用して、ユーザーログイン、データベースとテーブルの作成、ウェアハウスの作成、データの挿入/読み込み、クエリなどの標準のSnowflake操作を実行する方法を示す一連の例を提供します。
このトピックの最後にあるサンプルコードは、例を組み合わせて単一の動作するPythonプログラムにします。
注釈
Snowflakeは、データベース、スキーマ、テーブル、タスク、ウェアハウスなど、Snowflakeのコアリソースを SQL を使用することなく管理するためのファーストクラスのPython APIs を提供します。詳細については、 Snowflake Python API: PythonによるSnowflakeオブジェクトの管理 をご参照ください。
このトピックの内容:
データベース、スキーマ、ウェアハウスの作成¶
ログイン後、 CREATE DATABASE、 CREATE SCHEMA、および CREATE WAREHOUSE コマンドを使用して、データベース、スキーマ、およびウェアハウスを作成します(まだ存在しない場合)。
以下の例は、 tiny_warehouse
という名前のウェアハウス、 testdb
という名前のデータベース、および testschema
という名前のスキーマを作成する方法を示しています。スキーマを作成するときは、スキーマを作成するデータベースの名前を指定するか、スキーマを作成するデータベースに既に接続している必要があります。次の例では、 CREATE SCHEMA
コマンドの前に USE DATABASE
コマンドを実行して、スキーマが正しいデータベースに作成されるようにします。
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg") conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
データベース、スキーマ、ウェアハウスの使用¶
テーブルを作成するデータベースとスキーマを指定します。また、 DML ステートメントおよびクエリを実行するためのリソースを提供するウェアハウスを指定します。
たとえば、データベース testdb
、スキーマ testschema
およびウェアハウス tiny_warehouse
(以前に作成した)を使用するには、
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
テーブルの作成およびデータの挿入¶
CREATE TABLE コマンドを使用してテーブルを作成し、 INSERT コマンドを使用してテーブルにデータを入力します。
たとえば、 testtable
という名前のテーブルを作成し、テーブルに2つの行を挿入します。
conn.cursor().execute( "CREATE OR REPLACE TABLE " "test_table(col1 integer, col2 string)") conn.cursor().execute( "INSERT INTO test_table(col1, col2) VALUES " + " (123, 'test string1'), " + " (456, 'test string2')")
データのロード¶
個々の INSERT コマンドを使用してテーブルにデータを挿入する代わりに、内部または外部の場所にステージングされたファイルからデータを一括ロードできます。
内部の場所からのデータのコピー¶
ホストマシン上のファイルからテーブルにデータを読み込むには、最初に PUT コマンドを使用してファイルを内部の場所にステージングし、次に COPY INTO <テーブル> コマンドを使用してファイル内のデータをテーブルにコピーします。
例:
# Putting Data con.cursor().execute("PUT file:///tmp/data/file* @%testtable") con.cursor().execute("COPY INTO testtable")Linuxまたは macOS 環境の
/tmp/data
という名前のローカルディレクトリに CSV データが保存され、ディレクトリにはfile0
、file1
、...file100
という名前のファイルが含まれます。
外部の場所からのデータのコピー¶
外部の場所(つまり、S3バケット)に既にステージングされているファイルからテーブルにデータをロードするには、 COPY INTO <テーブル> コマンドを使用します。
例:
# Copying Data con.cursor().execute(""" COPY INTO testtable FROM s3://<s3_bucket>/data/ STORAGE_INTEGRATION = myint FILE_FORMAT=(field_delimiter=',') """.format( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY))条件:
s3://<s3バケット>/data/
はS3バケットの名前を指定しますバケット内のファイルの先頭には
data
が付きます。バケットには、アカウント管理者(つまり、ACCOUNTADMIN のロールを持つユーザー)またはグローバル CREATE INTEGRATION 権限を持つロールによって CREATE STORAGE INTEGRATION を使用して作成されたストレージ統合を使用してアクセスされます。ストレージ統合により、ユーザーはプライベートストレージの場所にアクセスするための認証情報を提供する必要がなくなります。
注釈
この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。
データのクエリ¶
Python用Snowflakeコネクタを使用すると、次の情報を送信できます。
クエリが完了したら、 Cursor
オブジェクトを使用して 結果の値をフェッチ します。デフォルトでは、Python用Snowflakeコネクタは、 値を Snowflakeデータ型 からPythonのネイティブデータ型に変換します。(値を文字列として返し、アプリケーションで型変換を実行するように選択することもできます。 データ変換をバイパスすることによるクエリパフォーマンスの改善 をご参照ください。)
注釈
デフォルトでは、 NUMBER 列の値は倍精度浮動小数点値(float64
)として返されます。fetch_pandas_all()
メソッドと fetch_pandas_batches()
メソッドでこれらを10進値(decimal.Decimal
)として返すには、 connect()
メソッドの arrow_number_to_decimal
パラメーターを True
に設定します。
同期クエリの実行¶
同期クエリを実行するには、 Cursor
オブジェクトで execute()
メソッドを呼び出します。例:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
cursor を使用した値のフェッチ で説明されているように、 Cursor
オブジェクトを使用して結果の値をフェッチします。
非同期クエリの実行¶
Python用Snowflakeコネクタは、非同期クエリ(つまり、クエリが完了する前にユーザーに制御を返すクエリ)をサポートしています。非同期クエリを送信し、ポーリングを使用して、クエリがいつ完了したかを判断できます。クエリが完了すると、結果を取得できます。
注釈
非同期クエリを実行するには、 ABORT_DETACHED_QUERY
構成パラメーターが FALSE
(デフォルト値)であることを確認する必要があります。
Snowflakeは、一定期間(デフォルト: 5分)後に接続を自動的に閉じます。これにより、アクティブなクエリが孤立します。値が TRUE
の場合、Snowflakeはこれらの孤立したクエリを終了するため、非同期クエリに影響を与える可能性があります。
この機能を使用すると、各クエリが完了するのを待たずに、複数のクエリを並行して送信できます。同じセッション中に、同期クエリと非同期クエリを組み合わせて実行することもできます。
最後に、ある接続から非同期クエリを送信し、別の接続からの結果を確認できます。たとえば、ユーザーはアプリケーションから長時間実行されるクエリを開始し、アプリケーションを終了し、後ほどアプリケーションを再起動して結果を確認することができます。
非同期クエリの送信¶
非同期クエリを送信するには、 Cursor
オブジェクトで execute_async()
メソッドを呼び出します。例:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
クエリを送信した後、
クエリがまだ実行されているかどうかを確認するには、 クエリのステータスの確認 をご参照ください。
クエリの結果を取得するには、 クエリ ID を使用したクエリの結果の取得 をご参照ください。
非同期クエリの実行例については、 非同期クエリの例 をご参照ください。
非同期クエリのベストプラクティス¶
非同期クエリを送信するときは、次のベストプラクティスに従ってください。
クエリを並行して実行する前に、どのクエリが他のクエリに依存しているかを確認してください。たとえば、 INSERT ステートメントは、対応する CREATE TABLE ステートメントが終了するまで開始するべきではありません。
使用可能なメモリに対して、実行するクエリが多すぎないことを確認してください。特に複数の結果が同時にメモリに格納されている場合に複数のクエリを並行して実行すると、通常はより多くのメモリを使用します。
ポーリング時に、クエリが成功しないまれなケースを処理します。
トランザクション制御ステートメント(BEGIN、 COMMIT、および ROLLBACK)が他のステートメントと並行して実行されないようにします。
Snowflakeクエリ ID の取得¶
クエリ ID は、Snowflakeによって実行される各クエリを識別します。Python用Snowflakeコネクタを使用してクエリを実行すると、 Cursor
オブジェクトの sfqid
属性を介してクエリ ID にアクセスできます。
# Retrieving a Snowflake Query ID cur = con.cursor() cur.execute("SELECT * FROM testtable") print(cur.sfqid)
クエリ ID を使用して次を実行できます。
ウェブインターフェイスでクエリのステータスを確認。
Classic Console では、クエリ IDs は History ページに表示されます。 履歴ページを使用してクエリを監視 をご参照ください。
プログラムでクエリのステータスを確認(例:非同期クエリが完了したかどうかを判断するため)。
クエリのステータスの確認 をご参照ください。
非同期クエリまたは以前に送信された同期クエリの結果を取得。
クエリ ID を使用したクエリの結果の取得 をご参照ください。
実行中のクエリをキャンセル。
クエリ ID ごとのクエリのキャンセル をご参照ください。
クエリのステータスの確認¶
クエリのステータスを確認するには、
Cursor
オブジェクトのsfqid
フィールドから、クエリ ID を取得します。クエリ ID を
Connection
オブジェクトのget_query_status()
メソッドに渡して、クエリのステータスを表すQueryStatus
列挙型定数を返します。デフォルトでは、クエリの結果がエラーになった場合、
get_query_status()
はエラーを発生しません。エラーを発生したい場合は、代わりにget_query_status_throw_if_error()
を呼び出してください。QueryStatus
列挙型定数を使用して、クエリのステータスを確認します。クエリがまだ実行されているかどうかを判断するには(たとえば、これが非同期クエリの場合)、定数を
Connection
オブジェクトのis_still_running()
メソッドに渡します。エラーが発生したかどうかを判断するには、定数を
is_an_error()
メソッドに渡します。
列挙型定数の包括的なリストについては、
QueryStatus
をご参照ください。
次の例では、非同期クエリを実行し、クエリのステータスを確認します。
import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
time.sleep(1)
次の例では、クエリの結果でエラーある場合にエラーが発生します。
from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
クエリ ID を使用したクエリの結果の取得¶
注釈
Cursor
オブジェクトで execute()
メソッドを呼び出して 同期クエリを実行 した場合は、結果を取得するためにクエリ ID を使用する必要はありません。 cursor を使用した値のフェッチ で説明されているように、結果から値をフェッチするだけです。
非同期クエリまたは以前に送信された同期クエリの結果を取得する場合は、次の手順に従います。
該当クエリのクエリ ID を取得します。 Snowflakeクエリ ID の取得 をご参照ください。
Cursor
オブジェクトでget_results_from_sfqid()
メソッドを呼び出して、結果を取得します。cursor を使用した値のフェッチ で説明されているように、
Cursor
オブジェクトを使用して結果の値をフェッチします。
クエリがまだ実行中の場合、フェッチメソッド( fetchone()
、 fetchmany()
、 fetchall()
など)はクエリの完了を待機することに注意してください。
例:
# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
cursor
を使用した値のフェッチ¶
カーソルオブジェクトの反復子メソッドを使用して、テーブルから値をフェッチします。
たとえば、以前に( テーブルの作成およびデータの挿入 で)作成された testtable
という名前のテーブルから「col1」および「col2」という名前の列をフェッチするには、次のようなコードを使用します。
cur = conn.cursor() try: cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1") for (col1, col2) in cur: print('{0}, {1}'.format(col1, col2)) finally: cur.close()
または、Python用Snowflakeコネクタが便利なショートカットを提供します。
for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"): print('{0}, {1}'.format(col1, col2))
単一の結果(つまり単一の行)を取得する必要がある場合は、 fetchone
メソッドを使用します。
col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone() print('{0}, {1}'.format(col1, col2))
一度に指定された行数を取得する必要がある場合は、行数で fetchmany
メソッドを使用します。
cur = con.cursor().execute("SELECT col1, col2 FROM testtable") ret = cur.fetchmany(3) print(ret) while len(ret) > 0: ret = cur.fetchmany(3) print(ret)注釈
結果セットが大きすぎてメモリに収まらない場合は、
fetchone
またはfetchmany
を使用します。
すべての結果を一度に取得する必要がある場合、
results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall() for rec in results: print('%s, %s' % (rec[0], rec[1]))
クエリのタイムアウトを設定するには、「開始」コマンドを実行し、クエリにタイムアウトパラメーターを含めます。クエリがパラメーター値の長さを超える場合、エラーが生成され、ロールバックが発生します。
次のコードでは、エラー604はクエリがキャンセルされたことを意味します。タイムアウトパラメーターは Timer()
を開始し、指定された時間内にクエリが終了しない場合はキャンセルします。
conn.cursor().execute("create or replace table testtbl(a int, b string)") conn.cursor().execute("begin") try: conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query except ProgrammingError as e: if e.errno == 604: print("timeout") conn.cursor().execute("rollback") else: raise e else: conn.cursor().execute("commit")
DictCursor
を使用した列名の値のフェッチ¶
列名で値をフェッチする場合は、タイプ DictCursor
の cursor
オブジェクトを作成します。
例:
# Querying data by DictCursor from snowflake.connector import DictCursor cur = con.cursor(DictCursor) try: cur.execute("SELECT col1, col2 FROM testtable") for rec in cur: print('{0}, {1}'.format(rec['COL1'], rec['COL2'])) finally: cur.close()
非同期クエリの例¶
以下は、非同期クエリの簡単な例です。
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
次の例では、ある接続から非同期クエリを送信し、別の接続から結果を取得します。
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Get the query ID for the asynchronous query.
query_id = cur.sfqid
# Close the cursor and the connection.
cur.close()
conn.close()
# Open a new connection.
new_conn = snowflake.connector.connect( ... )
# Create a new cursor.
new_cur = new_conn.cursor()
# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
クエリ ID ごとのクエリのキャンセル¶
クエリ ID によるクエリのキャンセル:
cur = cn.cursor() try: cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')") result = cur.fetchall() print(len(result)) print(result[0]) finally: cur.close()
文字列「queryID」を実際のクエリ ID に置き換えます。クエリ ID を取得するには、 Snowflakeクエリ ID の取得 をご参照ください。
データ変換をバイパスすることによるクエリパフォーマンスの改善¶
クエリのパフォーマンスを向上させるには、 snowflake.connector.converter_null
モジュールの SnowflakeNoConverterToPython
クラスを使用して、Snowflake内部データ型からネイティブPythonデータ型へのデータ変換をバイパスします。例:
from snowflake.connector.converter_null import SnowflakeNoConverterToPython con = snowflake.connector.connect( ... converter_class=SnowflakeNoConverterToPython ) for rec in con.cursor().execute("SELECT * FROM large_table"): # rec includes raw Snowflake data
その結果、すべてのデータは文字列形式で表されるため、アプリケーションはそれをネイティブPythonデータ型に変換する必要があります。たとえば、 TIMESTAMP_NTZ
および TIMESTAMP_LTZ
データは文字列形式で表されるエポック時間であり、 TIMESTAMP_TZ
データは文字列形式で表されるエポック時間の後に、空白と分単位の UTC へのオフセットが続きます。
バインドデータに影響はありません。Pythonネイティブデータは引き続き更新用にバインドできます。
データのバインド¶
SQL ステートメントで使用する値を指定するには、ステートメントにリテラルを含めるか、 変数をバインドします。変数をバインドするときは、 SQL ステートメントのテキストに1つ以上のプレースホルダーを配置し、各プレースホルダーに変数(使用する値)を指定します。
次の例は、リテラルとバインドの使用を比較しています。
リテラル:
con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")バインド:
con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
注釈
バインドできるデータのサイズ、またはバッチで結合できるデータのサイズには上限があります。詳細については、 クエリテキストサイズの制限 をご参照ください。
Snowflakeは、次のタイプのバインドをサポートしています。
pyformat
およびformat
。これは クライアント上のデータをバインド します。qmark
およびnumeric
。これは サーバー上のデータをバインド します。
これらのそれぞれについて、以下で説明します。
pyformat
または format
バインド¶
pyformat
バインディングと format
バインディングの両方が、サーバー側ではなくクライアント側でデータをバインドします。
デフォルトでは、Python用Snowflakeコネクタは pyformat
および format
の両方をサポートしているため、プレースホルダーとして %(name)s
または %s
を使用できます。例:
%(name)s
をプレースホルダーとして使用:conn.cursor().execute( "INSERT INTO test_table(col1, col2) " "VALUES(%(col1)s, %(col2)s)", { 'col1': 789, 'col2': 'test string3', })
%s
をプレースホルダーとして使用。con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
pyformat
と format
により、リストオブジェクトを使用して IN 演算子のデータをバインドすることもできます。
# Binding data for IN operator con.cursor().execute( "SELECT col1, col2 FROM testtable" " WHERE col2 IN (%s)", ( ['test string1', 'test string3'], ))
パーセント文字(「%」)は、 SQL LIKE のワイルドカード文字であり、Pythonの形式バインディング文字でもあります。フォーマットバインディングを使用し、 SQL コマンドにパーセント文字が含まれている場合、パーセント文字をエスケープする必要がある場合があります。例えば、 SQL ステートメントが次の場合:
SELECT col1, col2 FROM test_table WHERE col2 ILIKE '%York' LIMIT 1; -- Find York, New York, etc.
Pythonコードは次のようになります(元のパーセント記号をエスケープするための追加のパーセント記号に注意)。
sql_command = "select col1, col2 from test_table " sql_command += " where col2 like '%%York' limit %(lim)s" parameter_dictionary = {'lim': 1 } cur.execute(sql_command, parameter_dictionary)
qmark
または numeric
バインド¶
qmark
バインディングと numeric
バインディングの両方とも、クライアント側ではなくサーバー側でデータをバインドします。
qmark
バインディングの場合は、疑問符文字(?
)を使用して、変数の値を文字列のどこに挿入するかを指定します。numeric
バインディングの場合は、コロン(:
)の後に数字を使用して、その位置で置換する変数の位置を指定します。たとえば、:2
は2番目の変数を指定します。数値バインディングを使用して、同じクエリで同じ値を複数回バインドします。たとえば、長い VARCHAR、 BINARY、または 半構造化 の値を複数回使用する場合は、
numeric
バインディングにより値をサーバーに1回送信すると、それを複数回使用できるようになります。
次のセクションでは、 qmark
および numeric
バインディングの使用方法について説明します。
qmark
または numeric
バインドの使用¶
qmark
または numeric
スタイルのバインディングを使用するには、次のいずれかを実行します。
snowflake.connector.paramstyle='qmark'
snowflake.connector.paramstyle='numeric'
重要
connect()
メソッドを呼び出す 前に、 paramstyle
属性を設定する必要があります。
paramstyle
を qmark
または numeric
に設定する場合は、プレースホルダーとしてそれぞれ ?
または :N
(N
は数字に置換)を使用する必要があります。
例:
?
をプレースホルダーとして使用。import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(?, ?)", ( 789, 'test string3' ))
:N
をプレースホルダーとして使用。import snowflake.connector snowflake.connector.paramstyle='numeric' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(:1, :2)", ( 789, 'test string3' ))
次のクエリは、
numeric
バインドを使用して変数を再利用する方法を示しています。con.cursor().execute( "INSERT INTO testtable(complete_video, short_sample_of_video) " "VALUES(:1, SUBSTRING(:1, :2, :3))", ( binary_value_that_stores_video, # variable :1 starting_offset_in_bytes_of_video_clip, # variable :2 length_in_bytes_of_video_clip # variable :3 ))
datetime
での qmark
または numeric
バインドの使用¶
qmark
または numeric
バインディングを使用してデータをSnowflake TIMESTAMP データ型にバインドする場合は、Snowflakeタイムスタンプデータ型(TIMESTAMP_LTZ
または TIMESTAMP_TZ
)と値を指定するタプルに、バインド変数を設定します。例:
import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "CREATE OR REPLACE TABLE testtable2 (" " col1 int, " " col2 string, " " col3 timestamp_ltz" ")" ) con.cursor().execute( "INSERT INTO testtable2(col1,col2,col3) " "VALUES(?,?,?)", ( 987, 'test string4', ("TIMESTAMP_LTZ", datetime.now()) ) )
クライアント側のバインディングとは異なり、サーバー側のバインディングでは、列にSnowflakeデータ型が必要です。ほとんどの一般的なPythonデータ型には、Snowflakeデータ型への暗黙的なマッピングがすでにあります(例: int
は FIXED
にマッピングされます)。ただし、Python datetime
データは、複数のSnowflakeデータ型(TIMESTAMP_NTZ
、 TIMESTAMP_LTZ
、または TIMESTAMP_TZ
)のいずれかにバインドでき、デフォルトのマッピングは TIMESTAMP_NTZ
であるため、使用するSnowflakeデータ型を指定する必要があります。
IN 演算子でのバインド変数の使用¶
qmark
および numeric
(サーバー側バインディング)は、 IN 演算子を使用したバインド変数の使用をサポートしていません。
IN 演算子でバインド変数を使用する必要がある場合は、 クライアント側バインディング (pyformat
または format
)を使用します。
バッチ挿入の変数に対するバインドパラメーター¶
アプリケーションコードで、単一のバッチに複数の行を挿入できます。これを実行するには、 INSERT ステートメントの値のパラメーターを使用します。たとえば、次のステートメントは、 INSERT ステートメントの qmark
のバインドにプレースホルダーを使用します。
insert into grocery (item, quantity) values (?, ?)
次に、挿入する必要のあるデータを指定するには、複数のシーケンス(たとえば、タプルのリスト)のシーケンス1つである変数を定義します。
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
上記の例に示されているように、リスト内の各項目は、挿入される行の列値を含むタプルです。
バインディングを実行するには、 executemany()
メソッドを呼び出し、変数を2番目の引数として渡します。例:
conn = snowflake.connector.connect( ... ) rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)] conn.cursor().executemany( "insert into grocery (item, quantity) values (?, ?)", rows_to_insert)
サーバー上でデータをバインド している場合(つまり、 qmark
または numeric
バインドを使用)、コネクタはバインドを通じてバッチ挿入のパフォーマンスを最適化できます。
この手法を使用して多数の値を挿入する場合は、インジェスチョンのためにデータを(ローカルマシン上でファイルを作成することなく)仮ステージにストリーミングすると、ドライバーのパフォーマンスを向上させることができます。値の数がしきい値を超えると、ドライバーはこれを自動的に実行します。
さらに、セッションの現在のデータベースとスキーマを設定する必要があります。これらが設定されていない場合、ドライバーによって実行される CREATE TEMPORARY STAGE コマンドは、次のエラーにより失敗する可能性があります。
CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
注釈
データをSnowflakeデータベースにロードする別の方法( COPY コマンドを使用した一括ロードを含む)については、 Snowflakeにデータをロードする をご参照ください。
SQL インジェクション攻撃を回避する¶
SQL インジェクションの危険があるため、Pythonのフォーマット関数を使用してデータをバインドしないでください。例:
# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%(col1)d, '%(col2)s')" % { 'col1': 789, 'col2': 'test string3' })# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%d, '%s')" % ( 789, 'test string3' ))# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES({col1}, '{col2}')".format( col1=789, col2='test string3') )
代わりに、値を変数に格納してから、qmarkまたは数値バインドスタイルを使用してそれらの変数をバインドします。
列のメタデータの取得¶
結果セットの各列に関するメタデータ(例: 各列の名前、タイプ、精度、スケールなど)を取得するには、次のいずれかの方法を使用します。
execute()
メソッドを呼び出してクエリを実行した後にメタデータにアクセスするには、Cursor
オブジェクトのdescription
属性を使用します。クエリを実行 することなく メタデータにアクセスするには、
describe()
メソッドを呼び出します。describe
メソッドは、Python用Snowflakeコネクタ2.4.6以降のバージョンで使用できます。
description
属性は、次のいずれかの値に設定されます。
バージョン2.4.5以前: タプルのリスト。
バージョン2.4.6以降: ResultMetadata オブジェクトのリスト。(
describe
メソッドもこのリストを返します。)
各タプルと ResultMetadata
オブジェクトには、列のメタデータ(列名、データ型など)が含まれています。 メタデータには、インデックスによりアクセス するか、2.4.6以降のバージョンでは ResultMetadata
属性によりアクセスできます。
次の例は、返されたタプルと ResultMetadata
オブジェクトから、メタデータにアクセスする方法を示しています。
例: インデックスにより列名メタデータを取得(バージョン2.4.5以前):
次の例では、クエリの実行後に description
属性を使用して列名のリストを取得します。属性はタプルのリストであり、この例では、各タプルの最初の値から列名にアクセスします。
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col[0] for col in cur.description]))
例: 属性により列名メタデータを取得(バージョン2.4.6以降):
次の例では、クエリの実行後に description
属性を使用して列名のリストを取得します。属性は ResultMetaData オブジェクトのリストであり、この例では、各 ResultMetadata
オブジェクトの name
属性から列名にアクセスします。
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col.name for col in cur.description]))
例: クエリの実行することなく、列名のメタデータを取得(バージョン2.4.6以降):
次の例では、 describe
メソッドを使用して、クエリを実行することなく列名のリストを取得します。 describe()
メソッドは ResultMetaData オブジェクトのリストを返し、この例では各 ResultMetadata
オブジェクトの name
属性から列名にアクセスします。
cur = conn.cursor() result_metadata_list = cur.describe("SELECT * FROM test_table") print(','.join([col.name for col in result_metadata_list]))
エラーの処理¶
アプリケーションは、Snowflakeコネクタから発生した例外を適切に処理し、コードの実行を続行または停止する必要があります。
# Catching the syntax error cur = con.cursor() try: cur.execute("SELECT * FROM testtable") except snowflake.connector.errors.ProgrammingError as e: # default error message print(e) # customer error message print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid)) finally: cur.close()
execute_stream
を使用した SQL スクリプトの実行¶
execute_stream
関数を使用すると、ストリームで1つ以上の SQL スクリプトを実行できます。
from codecs import open with open(sqlfile, 'r', encoding='utf-8') as f: for cur in con.execute_stream(f): for ret in cur: print(ret)
接続の終了¶
ベストプラクティスとしては、 close
メソッドを呼び出して接続を閉じます。
connection.close()
これにより、収集されたクライアントメトリックがサーバーに送信され、セッションが削除されます。また、 try-finally
ブロックは、途中で例外が発生した場合でも接続が確実に閉じられるようにします。
# Connecting to Snowflake con = snowflake.connector.connect(...) try: # Running queries con.cursor().execute(...) ... finally: # Closing the connection con.close()
コンテキストマネージャーを使用したトランザクションの接続および制御¶
Python用Snowflakeコネクタは、必要に応じてリソースを割り当てたり解放したりするコンテキストマネージャーをサポートしています。コンテキストマネージャーは、 autocommit
が無効になっている場合、ステートメントのステータスに基づいてトランザクションをコミットまたはロールバックするのに役立ちます。
# Connecting to Snowflake using the context manager with snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False, ) as con: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
上記の例では、3番目のステートメントが失敗すると、コンテキストマネージャーはトランザクションの変更をロールバックし、接続を閉じます。すべてのステートメントが成功した場合、コンテキストマネージャーは変更をコミットし、接続を閉じます。
try
ブロックと except
ブロックを持つ同等のコードは次のとおりです。
# Connecting to Snowflake using try and except blocks con = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False) try: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail con.commit() except Exception as e: con.rollback() raise e finally: con.close()
ログ¶
Python用Snowflakeコネクタは、標準のPython logging
モジュールを活用して定期的にステータスを記録するため、アプリケーションはバックグラウンドで動作しているアクティビティを追跡できます。ログを有効にする最も簡単な方法は、アプリケーションの開始時に logging.basicConfig()
を呼び出すことです。
たとえば、ログレベルを INFO
に設定し、ログを /tmp/snowflake_python_connector.log
という名前のファイルに保存するには、
logging.basicConfig( filename=file_name, level=logging.INFO)
次のようにログレベルを DEBUG
に設定すると、より包括的なログを有効にできます。
# Logging including the timestamp, thread and the source code location import logging for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)オプションですが推奨される SecretDetector フォーマッタークラスにより、Snowflake Python Connectorログファイルに書き込まれる前に、特定の既知の機密情報セットが確実にマスクされるようにします。SecretDetector を使用するには、次のようなコードを使用します。
# Logging including the timestamp, thread and the source code location import logging from snowflake.connector.secret_detector import SecretDetector for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)注釈
botocore
およびboto3
は、Python用 AWS (Amazon Web Services) SDK から入手できます。
サンプルプログラム¶
次のサンプルコードは、前のセクションで説明した例の多くを有効なpythonプログラムに結合します。この例には2つの部分が含まれています。
親クラス(「python_veritas_base」)には、サーバーへの接続など、多くの一般的な操作のコードが含まれています。
子クラス(「python_connector_example」)は、テーブルのクエリなど、特定のクライアントのカスタム部分を表します。
このサンプルコードは、テストの1つから直接インポートされ、製品の最近のビルドで実行されたことを確認するのに役立ちます。
これはテストから取得されているため、一部のテストで使用される代替ポートとプロトコルを設定するための少量のコードが含まれています。ユーザーは、プロトコルまたはポート番号を設定 しない でください。代わりに、これらを省略してデフォルトを使用します。
これには、ドキュメントに個別にインポートできるコードを識別するためのセクションマーカー(別称「スニペットタグ」)も含まれています。セクションマーカーは通常、次と類似しています。
# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
これらのセクションマーカーは、ユーザーコードでは必要ありません。
コードサンプルの最初の部分には、次の一般的なサブルーチンが含まれています。
接続情報を含むコマンドライン引数(例:「--warehouse MyWarehouse」)を読み取ります。
サーバーに接続します。
ウェアハウス、データベース、およびスキーマを作成して使用します。
スキーマ、データベース、ウェアハウスを使い終えたらドロップします。
import logging
import os
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
class python_veritas_base:
"""
PURPOSE:
This is the Base/Parent class for programs that use the Snowflake
Connector for Python.
This class is intended primarily for:
* Sample programs, e.g. in the documentation.
* Tests.
"""
def __init__(self, p_log_file_name = None):
"""
PURPOSE:
This does any required initialization steps, which in this class is
basically just turning on logging.
"""
file_name = p_log_file_name
if file_name is None:
file_name = '/tmp/snowflake_python_connector.log'
# -- (> ---------- SECTION=begin_logging -----------------------------
logging.basicConfig(
filename=file_name,
level=logging.INFO)
# -- <) ---------- END_SECTION ---------------------------------------
# -- (> ---------------------------- SECTION=main ------------------------
def main(self, argv):
"""
PURPOSE:
Most tests follow the same basic pattern in this main() method:
* Create a connection.
* Set up, e.g. use (or create and use) the warehouse, database,
and schema.
* Run the queries (or do the other tasks, e.g. load data).
* Clean up. In this test/demo, we drop the warehouse, database,
and schema. In a customer scenario, you'd typically clean up
temporary tables, etc., but wouldn't drop your database.
* Close the connection.
"""
# Read the connection parameters (e.g. user ID) from the command line
# and environment variables, then connect to Snowflake.
connection = self.create_connection(argv)
# Set up anything we need (e.g. a separate schema for the test/demo).
self.set_up(connection)
# Do the "real work", for example, create a table, insert rows, SELECT
# from the table, etc.
self.do_the_real_work(connection)
# Clean up. In this case, we drop the temporary warehouse, database, and
# schema.
self.clean_up(connection)
print("\nClosing connection...")
# -- (> ------------------- SECTION=close_connection -----------------
connection.close()
# -- <) ---------------------------- END_SECTION ---------------------
# -- <) ---------------------------- END_SECTION=main --------------------
def args_to_properties(self, args):
"""
PURPOSE:
Read the command-line arguments and store them in a dictionary.
Command-line arguments should come in pairs, e.g.:
"--user MyUser"
INPUTS:
The command line arguments (sys.argv).
RETURNS:
Returns the dictionary.
DESIRABLE ENHANCEMENTS:
Improve error detection and handling.
"""
connection_parameters = {}
i = 1
while i < len(args) - 1:
property_name = args[i]
# Strip off the leading "--" from the tag, e.g. from "--user".
property_name = property_name[2:]
property_value = args[i + 1]
connection_parameters[property_name] = property_value
i += 2
return connection_parameters
def create_connection(self, argv):
"""
PURPOSE:
This gets account identifier and login information from the
environment variables and command-line parameters, connects to the
server, and returns the connection object.
INPUTS:
argv: This is usually sys.argv, which contains the command-line
parameters. It could be an equivalent substitute if you get
the parameter information from another source.
RETURNS:
A connection.
"""
# Get account identifier and login information from environment variables and command-line parameters.
# For information about account identifiers, see
# https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
# -- (> ----------------------- SECTION=set_login_info ---------------
# Get the password from an appropriate environment variable, if
# available.
PASSWORD = os.getenv('SNOWSQL_PWD')
# Get the other login info etc. from the command line.
if len(argv) < 11:
msg = "ERROR: Please pass the following command-line parameters:\n"
msg += "--warehouse <warehouse> --database <db> --schema <schema> "
msg += "--user <user> --account <account_identifier> "
print(msg)
sys.exit(-1)
else:
connection_parameters = self.args_to_properties(argv)
USER = connection_parameters["user"]
ACCOUNT = connection_parameters["account"]
WAREHOUSE = connection_parameters["warehouse"]
DATABASE = connection_parameters["database"]
SCHEMA = connection_parameters["schema"]
# Optional: for internal testing only.
try:
PORT = connection_parameters["port"]
except:
PORT = ""
try:
PROTOCOL = connection_parameters["protocol"]
except:
PROTOCOL = ""
# If the password is set by both command line and env var, the
# command-line value takes precedence over (is written over) the
# env var value.
# If the password wasn't set either in the environment var or on
# the command line...
if PASSWORD is None or PASSWORD == '':
print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
sys.exit(-2)
# -- <) ---------------------------- END_SECTION ---------------------
# Optional diagnostic:
#print("USER:", USER)
#print("ACCOUNT:", ACCOUNT)
#print("WAREHOUSE:", WAREHOUSE)
#print("DATABASE:", DATABASE)
#print("SCHEMA:", SCHEMA)
#print("PASSWORD:", PASSWORD)
#print("PROTOCOL:" "'" + PROTOCOL + "'")
#print("PORT:" + "'" + PORT + "'")
print("Connecting...")
# If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
# -- (> ------------------- SECTION=connect_to_snowflake ---------
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)
# -- <) ---------------------------- END_SECTION -----------------
else:
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA,
# Optional: for internal testing only.
protocol=PROTOCOL,
port=PORT
)
return conn
def set_up(self, connection):
"""
PURPOSE:
Set up to run a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.create_warehouse_database_and_schema(connection)
def do_the_real_work(self, conn):
"""
PURPOSE:
Your sub-class should override this to include the code required for
your documentation sample or your test case.
This default method does a very simple self-test that shows that the
connection was successful.
"""
# Create a cursor for this connection.
cursor1 = conn.cursor()
# This is an example of an SQL statement we might want to run.
command = "SELECT PI()"
# Run the statement.
cursor1.execute(command)
# Get the results (should be only one):
for row in cursor1:
print(row[0])
# Close this cursor.
cursor1.close()
def clean_up(self, connection):
"""
PURPOSE:
Clean up after a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.drop_warehouse_database_and_schema(connection)
def create_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Create the temporary schema, database, and warehouse that we use
for most tests/demos.
"""
# Create a database, schema, and warehouse if they don't already exist.
print("\nCreating warehouse, database, schema...")
# -- (> ------------- SECTION=create_warehouse_database_schema -------
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# -- (> --------------- SECTION=use_warehouse_database_schema --------
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
def drop_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Drop the temporary schema, database, and warehouse that we create
for most tests/demos.
"""
# -- (> ------------- SECTION=drop_warehouse_database_schema ---------
conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# ----------------------------------------------------------------------------
if __name__ == '__main__':
pvb = python_veritas_base()
pvb.main(sys.argv)
コードサンプルの2番目の部分では、テーブルを作成したり、それに行を挿入したりします。
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
# Import the base class that contains methods used in many tests and code
# examples.
from python_veritas_base import python_veritas_base
class python_connector_example (python_veritas_base):
"""
PURPOSE:
This is a simple example program that shows how to use the Snowflake
Python Connector to create and query a table.
"""
def __init__(self):
pass
def do_the_real_work(self, conn):
"""
INPUTS:
conn is a Connection object returned from snowflake.connector.connect().
"""
print("\nCreating table test_table...")
# -- (> ----------------------- SECTION=create_table ---------------------
conn.cursor().execute(
"CREATE OR REPLACE TABLE "
"test_table(col1 integer, col2 string)")
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")
# -- <) ---------------------------- END_SECTION -------------------------
print("\nSelecting from test_table...")
# -- (> ----------------------- SECTION=querying_data --------------------
cur = conn.cursor()
try:
cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# -- <) ---------------------------- END_SECTION -------------------------
# ============================================================================
if __name__ == '__main__':
test_case = python_connector_example()
test_case.main(sys.argv)
このサンプルを実行するには、次を実行します。
最初のコードを「python_veritas_base.py」という名前のファイルにコピーします。
2番目のコードを「python_connector_example.py」という名前のファイルにコピーします
次のように、 SNOWSQL_PWD 環境変数をパスワードに設定します。
export SNOWSQL_PWD='MyPassword'次のようなコマンドラインを使用してプログラムを実行します(もちろん、ユーザーとアカウントの情報を自分のユーザーとアカウントの情報に置き換えます)。
警告
これにより、プログラムの最後にウェアハウス、データベース、およびスキーマが削除されます。既存のデータベースの名前は 使用しないでください 。
python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
出力は次のとおりです。
Connecting...
Creating warehouse, database, schema...
Creating table test_table...
Selecting from test_table...
123, test string1
456, test string2
Closing connection...
長い例を次に示します。
注釈
アカウントとログイン情報を設定するセクションで、Snowflakeログイン情報(名前、パスワードなど)に一致するように、必要に応じて変数を置き換えます。
この例では、format()関数を使用してステートメントを作成します。ご使用の環境に SQL インジェクション攻撃のリスクがある場合、format()を使用するよりも値をバインドすることをお勧めします。
#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#
# Logging
import logging
logging.basicConfig(
filename='/tmp/snowflake_python_connector.log',
level=logging.INFO)
import snowflake.connector
# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'
import os
# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# Connecting to Snowflake
con = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
)
# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")
# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")
# Creating a table and inserting data
con.cursor().execute(
"CREATE OR REPLACE TABLE "
"testtable(col1 integer, col2 string)")
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(123, 'test string1'),(456, 'test string2')")
# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
STORAGE_INTEGRATION = myint
FILE_FORMAT=(field_delimiter=',')
""".format(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
# Querying data
cur = con.cursor()
try:
cur.execute("SELECT col1, col2 FROM testtable")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# Binding data
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%(col1)s, %(col2)s)", {
'col1': 789,
'col2': 'test string3',
})
# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))
# Catching syntax errors
cur = con.cursor()
try:
cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
# default error message
print(e)
# user error message
print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
cur.close()
# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
# Closing the connection
con.close()