Verwenden des Python-Konnektors¶
Unter diesem Thema werden zahlreiche Beispiele bereitgestellt, die veranschaulichen, wie Sie den Snowflake-Konnektor verwenden können, um Snowflake-Standardoperationen wie Benutzeranmeldung, Datenbank- und Tabellenerstellung, Warehouse-Erstellung, Einfügen/Laden von Daten sowie Abfragen durchzuführen.
Im Beispielcode am Ende dieses Themas werden einzelne Beispiele zu einem einzigen, funktionierenden Python-Programm zusammengeführt.
Bemerkung
Snowflake bietet jetzt erstklassige Python-APIs für die Verwaltung von Snowflake-Kernressourcen wie Datenbanken, Schemas, Tabellen, Aufgaben und Warehouses, ohne SQL zu verwenden. Weitere Informationen dazu finden Sie unter Snowflake-Python-API: Verwalten von Snowflake-Objekten mit Python.
Unter diesem Thema:
Erstellen einer Datenbank, eines Schemas und eines Warehouse¶
Erstellen Sie nach der Anmeldung mit den Befehlen CREATE DATABASE, CREATE SCHEMA und CREATE WAREHOUSE eine Datenbank, ein Schema und ein Warehouse, falls diese noch nicht vorhanden sind.
Das folgende Beispiel zeigt, wie sich ein Warehouse mit dem Namen tiny_warehouse
, eine Datenbank mit dem Namen testdb
und ein Schema mit dem Namen testschema
erstellen lassen. Beachten Sie, dass Sie beim Erstellen des Schemas entweder den Namen der Datenbank angeben müssen, in der das Schema erstellt werden soll, oder bereits mit der Datenbank verbunden sein müssen, in der das Schema erstellt werden soll. Im folgenden Beispiel wird vor dem Befehl CREATE SCHEMA
ein USE DATABASE
-Befehl ausgeführt, um sicherzustellen, dass das Schema in der korrekten Datenbank erstellt wird.
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg") conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Verwenden von Datenbank, Schema und Warehouse¶
Geben Sie Datenbank und Schema an, in denen Sie Tabellen erstellen möchten. Geben Sie auch das Warehouse an, das Ressourcen für die Ausführung von DML-Anweisungen und -Abfragen bereitstellen soll.
Folgendes Beispiel zeigt die Verwendung von Datenbank testdb
, Schema testschema
und Warehouse tiny_warehouse
(zuvor erstellt):
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Erstellen von Tabellen und Einfügen von Daten¶
Verwenden Sie den Befehl CREATE TABLE, um Tabellen zu erstellen, und den Befehl INSERT, um die Tabellen mit Daten zu füllen.
Erstellen Sie beispielsweise eine Tabelle mit dem Namen testtable
, und fügen Sie zwei Zeilen in die Tabelle ein:
conn.cursor().execute( "CREATE OR REPLACE TABLE " "test_table(col1 integer, col2 string)") conn.cursor().execute( "INSERT INTO test_table(col1, col2) VALUES " + " (123, 'test string1'), " + " (456, 'test string2')")
Laden von Daten¶
Anstatt Daten mit einzelnen INSERT-Befehlen in Tabellen einzufügen, können Sie Daten aus Dateien, die entweder an einem internen oder externen Speicherort bereitgestellt werden, per Massenladen hinzufügen.
Kopieren von Daten von einem internen Speicherort¶
Um Daten aus Dateien auf Ihrem Hostcomputer in eine Tabelle zu laden, verwenden Sie zunächst den Befehl PUT für das Staging der Datei an einem internen Speicherort. Verwenden Sie dann den Befehl COPY INTO <Tabelle>, um die Daten aus den Dateien in die Tabelle zu kopieren.
Beispiel:
# Putting Data con.cursor().execute("PUT file:///tmp/data/file* @%testtable") con.cursor().execute("COPY INTO testtable")Hier werden die CSV-Daten in einem lokalen Verzeichnis namens
/tmp/data
in einer Linux- oder macOS-Umgebung gespeichert. Das Verzeichnis enthält dann Dateien mit den Namenfile0
,file1
, …file100
.
Kopieren von Daten von einem externen Speicherort¶
Um Daten aus Dateien, die bereits in einem externen Stagingbereich (z. B. Ihr S3-Bucket) bereitgestellt wurden, in eine Tabelle zu laden, verwenden Sie den Befehl COPY INTO <Tabelle>.
Beispiel:
# Copying Data con.cursor().execute(""" COPY INTO testtable FROM s3://<s3_bucket>/data/ STORAGE_INTEGRATION = myint FILE_FORMAT=(field_delimiter=',') """.format( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY))Wobei:
s3://<S3-Bucket>/data/
gibt den Namen Ihres S3-Buckets an.Die Dateien im Bucket erhalten das Präfix
data
.Der Zugriff auf den Bucket erfolgt über eine Speicherintegration, die mit CREATE STORAGE INTEGRATION von einem Kontoadministrator (d. h. einem Benutzer mit der Rolle ACCOUNTADMIN) oder einer Rolle mit der globalen Berechtigung CREATE INTEGRATION erstellt wurde. Bei Verwendung einer Speicherintegration benötigen Benutzer für den Zugriff auf einen privaten Speicherort keine Anmeldeinformationen mehr.
Bemerkung
In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.
Abfragen von Daten¶
Mit dem Snowflake-Konnektor für Python können Sie Folgendes übermitteln:
Eine synchrone Abfrage, die die Kontrolle an Ihre Anwendung zurückgibt, nachdem die Abfrage abgeschlossen ist.
Eine asynchrone Abfrage, die die Kontrolle an Ihre Anwendung zurückgibt, bevor die Abfrage abgeschlossen ist.
Nachdem die Abfrage abgeschlossen wurde, verwenden Sie das Cursor
-Objekt, um die Werte in den Ergebnissen abzurufen. Standardmäßig konvertiert der Snowflake-Konnektor für Python die Werte von Snowflake-Datentypen in native Python-Datentypen. (Beachten Sie, dass Sie die Werte auch als Zeichenfolgen zurückgeben und die Typkonvertierungen in Ihrer Anwendung durchführen können. Siehe Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung.)
Bemerkung
Standardmäßig werden die Werte aus NUMBER-Spalten als Gleitkommawerten mit doppelter Genauigkeit (float64
) zurückgegeben. Um diese als Dezimalwerte (decimal.Decimal
) in den Methoden fetch_pandas_all()
und fetch_pandas_batches()
zurückzugeben, setzen Sie den Parameter arrow_number_to_decimal
in der Methode connect()
auf True
.
Ausführen einer synchronen Abfrage¶
Um eine synchrone Abfrage auszuführen, rufen Sie die Methode execute()
im Cursor
-Objekt auf. Beispiel:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Verwenden Sie das Cursor
-Objekt, um die Werte in den Ergebnissen abzurufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.
Ausführen einer asynchronen Abfrage¶
Der Snowflake-Konnektor für Python unterstützt asynchrone Abfragen (d. h. Abfragen, die dem Benutzer die Kontrolle zurückgeben, bevor die Abfrage abgeschlossen ist). Sie können eine asynchrone Abfrage übermitteln und mithilfe von Abrufen (Polling) feststellen, wann die Abfrage abgeschlossen ist. Nachdem die Abfrage abgeschlossen ist, können Sie die Ergebnisse abrufen.
Bemerkung
Um asynchrone Abfragen auszuführen, müssen Sie sicherstellen, dass der Konfigurationsparameter ABORT_DETACHED_QUERY
den Wert FALSE
hat (Standardwert).
Snowflake schließt Verbindungen automatisch nach einer bestimmten Zeit (Standard: 5 Minuten), wodurch alle aktiven Abfragen verwaist sind. Wenn der Wert TRUE
ist, bricht Snowflake diese verwaisten Abfragen ab, was sich auf asynchrone Abfragen auswirken kann.
Mit dieser Funktion können Sie mehrere Abfragen parallel übermitteln, ohne auf den Abschluss jeder Abfrage warten zu müssen. Sie können innerhalb derselben Sitzung auch Kombinationen von synchronen und asynchronen Abfragen ausführen.
Schließlich können Sie eine asynchrone Abfrage über eine Verbindung übermitteln und die Ergebnisse über eine andere Verbindung abrufen. Beispielsweise kann ein Benutzer eine Abfrage mit langer Laufzeit über Ihre Anwendung initiieren, die Anwendung beenden und die Anwendung zu einem späteren Zeitpunkt erneut starten, um die Ergebnisse zu überprüfen.
Übermitteln einer asynchronen Abfrage¶
Um eine asynchrone Abfrage zu übermitteln, rufen Sie die Methode execute_async()
im Cursor
-Objekt auf. Beispiel:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Nach dem Übermitteln der Abfrage:
Um festzustellen, ob die Abfrage noch ausgeführt wird, siehe Überprüfen des Status einer Abfrage.
Um die Ergebnisse der Abfrage abzurufen, siehe Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen.
Beispiele für das Ausführen asynchroner Abfragen finden Sie unter Beispiele für asynchrone Abfragen.
Best Practices für asynchrone Abfragen¶
Beachten Sie beim Übermitteln von asynchronen Abfrage die folgenden bewährten Verfahren:
Stellen Sie vor der parallelen Ausführung von Abfragen sicher, dass Sie die Abhängigkeiten der Abfragen von anderen Abfragen genau kennen. Einige Abfragen sind voneinander und von der Ausführungsreihenfolge abhängig und sind daher nicht für die Parallelisierung geeignet. Beispielsweise kann eine INSERT-Anweisung offensichtlich erst beginnen, nachdem die entsprechende CREATE TABLE-Anweisung abgeschlossen wurde.
Stellen Sie sicher, dass die Anzahl der gestarteten Abfragen auf den verfügbaren Arbeitsspeicher abgestimmt ist. Das parallele Ausführen mehrerer Abfragen beansprucht normalerweise mehr Speicher, insbesondere wenn mehrere Ergebnis-Datasets gleichzeitig im Arbeitsspeicher gespeichert sind.
Behandeln Sie beim Abrufen (Polling) die seltenen Fälle, in denen eine Abfrage nicht erfolgreich ist.
Stellen Sie sicher, dass Anweisungen zur Transaktionssteuerung (BEGIN, COMMIT und ROLLBACK) nicht parallel zu anderen Anweisungen ausgeführt werden.
Abrufen der Snowflake-Abfrage-ID¶
Eine Abfrage-ID kennzeichnet jede von Snowflake ausgeführte Abfrage. Wenn Sie den Snowflake-Konnektor für Python zum Ausführen einer Abfrage verwenden, können Sie auf die Abfrage-ID über das Attribut sfqid
im Cursor
-Objekt zugreifen:
# Retrieving a Snowflake Query ID cur = con.cursor() cur.execute("SELECT * FROM testtable") print(cur.sfqid)
Sie können die Abfrage-ID für Folgendes verwenden:
Überprüfen des Status von Abfragen über die Weboberfläche.
Auf der klassischen Weboberfläche werden die Abfrage-IDs auf der Seite History angezeigt. Siehe Verwenden der Verlaufsseite zum Überwachen von Abfragen.
Programmgesteuertes Prüfen des Status einer Abfrage (z. B. um festzustellen, ob eine asynchrone Abfrage abgeschlossen ist).
Abrufen der Ergebnisse einer asynchronen Abfrage oder einer zuvor übermittelten synchronen Abfrage.
Siehe Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen.
Abbrechen einer in Ausführung befindlichen Abfrage.
Überprüfen des Status einer Abfrage¶
So überprüfen Sie den Status einer Abfrage:
Rufen Sie die Abfrage-ID aus dem Feld
sfqid
im ObjektCursor
ab.Übergeben Sie die Abfrage-ID an die Methode
get_query_status()
desConnection
-Objekts, um die EnumerationskonstanteQueryStatus
zurückzugeben, die den Status der Abfrage darstellt.Standardmäßig löst
get_query_status()
keinen Fehler aus, wenn die Abfrage zu einem Fehler geführt hat. Wenn Sie möchten, dass ein Fehler ausgelöst wird, rufen Sie stattdessenget_query_status_throw_if_error()
auf.Verwenden Sie die Enumerationskonstante
QueryStatus
, um den Status der Abfrage zu überprüfen.Um festzustellen, ob die Abfrage noch ausgeführt wird (z. B. wenn es sich um eine asynchrone Abfrage handelt), übergeben Sie die Konstante an die Methode
is_still_running()
desConnection
-Objekts.Um festzustellen, ob ein Fehler aufgetreten ist, übergeben Sie die Konstante an die Methode
is_an_error()
.
Eine vollständige Liste der Enumerationskonstanten erhalten Sie mit
QueryStatus
.
Im folgenden Beispiel wird eine asynchrone Abfrage ausgeführt und der Status der Abfrage überprüft:
import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
time.sleep(1)
Im folgenden Beispiel wird ein Fehler ausgelöst, wenn die Abfrage zu einem Fehler geführt hat:
from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen¶
Bemerkung
Wenn Sie eine synchrone Abfrage ausgeführt haben, indem Sie die Methode execute()
für ein Cursor
-Objekt aufgerufen haben, ist die Abfrage-ID zum Abrufen der Ergebnisse nicht erforderlich. Sie können einfach die Werte aus den Ergebnissen abrufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.
Wenn Sie die Ergebnisse einer asynchronen Abfrage oder einer zuvor übermittelten synchronen Abfrage abrufen möchten, führen Sie die folgenden Schritte aus:
Rufen Sie die Abfrage-ID der Abfrage ab. Siehe Abrufen der Snowflake-Abfrage-ID.
Rufen Sie die Methode
get_results_from_sfqid()
imCursor
-Objekt auf, um die Ergebnisse abzurufen.Verwenden Sie das
Cursor
-Objekt, um die Werte in den Ergebnissen abzurufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.
Falls die Abfrage noch ausgeführt wird, müssen Sie beachten, dass die Fetch-Methoden (fetchone()
, fetchmany()
, fetchall()
usw.) auf den Abschluss der Abfrage warten.
Beispiel:
# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Verwenden von cursor
zum Abrufen von Werten¶
Rufen Sie Werte aus einer Tabelle mit der Cursorobjektiterator-Methode ab.
Um beispielsweise Spalten mit dem Namen „col1“ und „col2“ aus der zuvor mit testtable
erstellten Tabelle (siehe Erstellen von Tabellen und Einfügen von Daten) abzurufen, verwenden Sie Code, der dem folgenden ähnelt:
cur = conn.cursor() try: cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1") for (col1, col2) in cur: print('{0}, {1}'.format(col1, col2)) finally: cur.close()
Alternativ bietet der Snowflake-Konnektor für Python ein abgekürztes Verfahren:
for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"): print('{0}, {1}'.format(col1, col2))
Wenn Sie ein einzelnes Ergebnis (d. h. eine einzelnen Zeile) erhalten möchten, verwenden Sie die Methode fetchone
:
col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone() print('{0}, {1}'.format(col1, col2))
Wenn Sie die angegebene Anzahl von Zeilen auf einmal erhalten möchten, verwenden Sie die Methode fetchmany
mit der Anzahl der Zeilen:
cur = con.cursor().execute("SELECT col1, col2 FROM testtable") ret = cur.fetchmany(3) print(ret) while len(ret) > 0: ret = cur.fetchmany(3) print(ret)Bemerkung
Verwenden Sie
fetchone
oderfetchmany
, wenn das Resultset zu groß für den verfügbaren Speicher ist.
Wenn Sie alle Ergebnisse auf einmal erhalten müssen:
results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall() for rec in results: print('%s, %s' % (rec[0], rec[1]))
Um ein Zeitlimit (Timeout) für eine Abfrage festzulegen, führen Sie einen „begin“-Befehl aus, und fügen Sie einen Timeout-Parameter in die Abfrage ein. Wenn die Abfrage die Zeitdauer des Parameterwertes überschreitet, wird ein Fehler generiert und ein Rollback durchgeführt.
Im folgenden Code bedeutet der Fehler 604, dass die Abfrage abgebrochen wurde. Der Timeout-Parameter startet Timer()
und bricht ab, wenn die Abfrage nicht innerhalb der angegebenen Zeit beendet wird.
conn.cursor().execute("create or replace table testtbl(a int, b string)") conn.cursor().execute("begin") try: conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query except ProgrammingError as e: if e.errno == 604: print("timeout") conn.cursor().execute("rollback") else: raise e else: conn.cursor().execute("commit")
Verwenden von DictCursor
zum Abrufen von Werten nach Spaltenname¶
Wenn Sie einen Wert anhand des Spaltennamens abrufen möchten, erstellen Sie ein cursor
-Objekt vom Typ DictCursor
.
Beispiel:
# Querying data by DictCursor from snowflake.connector import DictCursor cur = con.cursor(DictCursor) try: cur.execute("SELECT col1, col2 FROM testtable") for rec in cur: print('{0}, {1}'.format(rec['COL1'], rec['COL2'])) finally: cur.close()
Beispiele für asynchrone Abfragen¶
Im Folgenden finden Sie ein einfaches Beispiel für eine asynchrone Abfrage:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Im nächsten Beispiel wird eine asynchrone Abfrage von einer Verbindung aus übermittelt und das Ergebnis von einer anderen Verbindung aus abgerufen:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Get the query ID for the asynchronous query.
query_id = cur.sfqid
# Close the cursor and the connection.
cur.close()
conn.close()
# Open a new connection.
new_conn = snowflake.connector.connect( ... )
# Create a new cursor.
new_cur = new_conn.cursor()
# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Abbrechen einer Abfrage über die Abfrage-ID¶
So brechen Sie eine Abfrage über die Abfrage-ID ab:
cur = cn.cursor() try: cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')") result = cur.fetchall() print(len(result)) print(result[0]) finally: cur.close()
Ersetzen Sie die Zeichenfolge „queryID“ durch die tatsächliche Abfrage-ID. Um die ID für eine Abfrage zu erhalten, siehe Abrufen der Snowflake-Abfrage-ID.
Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung¶
Verwenden Sie zur Verbesserung der Abfrageleistung die Klasse SnowflakeNoConverterToPython
im Modul snowflake.connector.converter_null
, um die Datenkonvertierung vom internen Snowflake-Datentyp in den systemeigenen Python-Datentyp zu umgehen, z. B.:
from snowflake.connector.converter_null import SnowflakeNoConverterToPython con = snowflake.connector.connect( ... converter_class=SnowflakeNoConverterToPython ) for rec in con.cursor().execute("SELECT * FROM large_table"): # rec includes raw Snowflake data
Im Ergebnis werden alle Daten als Zeichenfolgen dargestellt, sodass die Anwendung für die Konvertierung in die systemeigenen Python-Datentypen verantwortlich ist. Beispielsweise sind TIMESTAMP_NTZ
- und TIMESTAMP_LTZ
-Daten die als Zeichenfolgen dargestellte Epochenzeit, und TIMESTAMP_TZ
-Daten sind die Epochenzeit, gefolgt von einem Leerzeichen, gefolgt vom Offset zu UTC in Minuten, dargestellt als Zeichenfolge.
Das Binden der Daten wird nicht beeinflusst. Python-native Daten können weiterhin für Updates gebunden werden.
Binden von Daten¶
Um Werte anzugeben, die in einer SQL-Anweisung verwendet werden sollen, können Sie Literale in die Anweisung einschließen oder Variablen binden. Wenn Sie Variablen binden, fügen Sie einen oder mehrere Platzhalter in den Text der SQL-Anweisung ein, und geben Sie dann für jeden Platzhalter die Variable (den zu verwendenden Wert) an.
Im folgenden Beispiel wird die Verwendung von Literalen und die Bindung gegenübergestellt:
Literale:
con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")Bindung:
con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
Bemerkung
Es gibt eine Obergrenze für die Datengröße, die Sie binden oder in einem Batch kombinieren können. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.
Snowflake unterstützt die folgenden Bindungstypen:
pyformat
undformat
, die Daten auf dem Client binden.qmark
undnumeric
, die Daten auf dem Server binden.
Jeder Typ wird unten erklärt.
pyformat
- oder format
-Bindung¶
Sowohl die pyformat
-Bindung als auch die format
-Bindung binden Daten auf der Clientseite und nicht auf der Serverseite.
Standardmäßig unterstützt der Snowflake-Konnektor für Python sowohl pyformat
als auch format
, sodass Sie %(name)s
oder %s
als Platzhalter verwenden können. Beispiel:
Verwenden von
%(name)s
als Platzhalter:conn.cursor().execute( "INSERT INTO test_table(col1, col2) " "VALUES(%(col1)s, %(col2)s)", { 'col1': 789, 'col2': 'test string3', })
Verwenden von
%s
als Platzhalter:con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
Mit pyformat
und format
können Sie auch ein Listenobjekt verwenden, um Daten für den IN-Operator zu binden:
# Binding data for IN operator con.cursor().execute( "SELECT col1, col2 FROM testtable" " WHERE col2 IN (%s)", ( ['test string1', 'test string3'], ))
Das Prozentzeichen („%“) ist sowohl ein Platzhalterzeichen für SQL LIKE als auch ein Formatbindungszeichen für Python. Wenn Sie die Formatbindung verwenden und Ihr SQL-Befehl das Prozentzeichen enthält, müssen Sie möglicherweise das Prozentzeichen maskieren. Wenn Ihre SQL-Anweisung beispielsweise wie folgt lautet:
SELECT col1, col2 FROM test_table WHERE col2 ILIKE '%York' LIMIT 1; -- Find York, New York, etc.
Dann sollte Ihr Python-Code wie folgt aussehen (beachten Sie das zusätzliche Prozentzeichen, um das ursprüngliche Prozentzeichen zu umgehen):
sql_command = "select col1, col2 from test_table " sql_command += " where col2 like '%%York' limit %(lim)s" parameter_dictionary = {'lim': 1 } cur.execute(sql_command, parameter_dictionary)
qmark
- oder numeric
-Bindung¶
Sowohl die qmark
-Bindung als auch die numeric
-Bindung binden Daten eher auf der Serverseite als auf der Clientseite:
Verwenden Sie für die
qmark
-Bindung ein Fragezeichenzeichen (?
), um anzugeben, an welcher Stelle in der Zeichenfolge der Wert einer Variablen eingefügt werden soll.Verwenden Sie für die
numeric
-Bindung einen Doppelpunkt (:
), gefolgt von einer Zahl, mit der die Position der Variablen angegeben wird, die an dieser Position ersetzt werden soll. So gibt beispielsweise:2
die zweite Variable an.Verwenden Sie die numerische Bindung, um denselben Wert mehr als einmal in derselben Abfrage zu binden. Wenn Sie z. B. einen langen VARCHAR- oder BINARY- oder semistrukturierten Wert haben, den Sie mehr als einmal verwenden möchten, dann können Sie mit der
numeric
-Bindung den Wert einmal an den Server senden und ihn mehrfach verwenden.
In den nächsten Abschnitten wird erklärt, wie Sie qmark
- und numeric
-Bindung verwenden:
Verwenden von qmark
- oder numeric
-Bindung¶
Um die Stilbindung qmark
oder numeric
zu verwenden, führen Sie einen der folgenden Schritte aus:
snowflake.connector.paramstyle='qmark'
snowflake.connector.paramstyle='numeric'
Wichtig
Sie müssen das paramstyle
-Attribut festlegen, bevor Sie die connect()
-Methode aufrufen.
Wenn Sie paramstyle
auf qmark
oder numeric
setzen, müssen Sie ?
oder :N
(wobei N
durch eine Zahl ersetzt wird) als Platzhalter verwenden.
Beispiel:
Verwenden von
?
als Platzhalter:import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(?, ?)", ( 789, 'test string3' ))
Verwenden von
:N
als Platzhalter:import snowflake.connector snowflake.connector.paramstyle='numeric' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(:1, :2)", ( 789, 'test string3' ))
Die folgende Abfrage zeigt die Verwendung der
numeric
-Bindung zur Wiederverwendung einer Variablen:con.cursor().execute( "INSERT INTO testtable(complete_video, short_sample_of_video) " "VALUES(:1, SUBSTRING(:1, :2, :3))", ( binary_value_that_stores_video, # variable :1 starting_offset_in_bytes_of_video_clip, # variable :2 length_in_bytes_of_video_clip # variable :3 ))
Verwenden von qmark
- oder numeric
-Bindung mit datetime
-Objekten¶
Wenn Sie die Bindung qmark
oder numeric
verwenden, um Daten an einen Snowflake-Datentyp TIMESTAMP zu binden, setzen Sie die Bindungsvariable auf ein Tupel, das den Datentyp des Snowflake-Zeitstempels (TIMESTAMP_LTZ
oder TIMESTAMP_TZ
) und den Wert angibt. Beispiel:
import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "CREATE OR REPLACE TABLE testtable2 (" " col1 int, " " col2 string, " " col3 timestamp_ltz" ")" ) con.cursor().execute( "INSERT INTO testtable2(col1,col2,col3) " "VALUES(?,?,?)", ( 987, 'test string4', ("TIMESTAMP_LTZ", datetime.now()) ) )
Im Gegensatz zur clientseitigen Bindung benötigt die serverseitige Bindung den Snowflake-Datentyp für die Spalte. Die meisten gängigen Python-Datentypen weisen bereits implizite Zuordnungen zu Snowflake-Datentypen auf (z. B. int
ist FIXED
zugeordnet). Da die Python-datetime
-Daten jedoch an einen von mehreren Snowflake-Datentypen (TIMESTAMP_NTZ
, TIMESTAMP_LTZ
oder TIMESTAMP_TZ
) gebunden sein können und die Standardzuordnung TIMESTAMP_NTZ
ist, müssen Sie den zu verwendenden Snowflake-Datentyp angeben.
Verwenden von Bindungsvariablen mit dem IN-Operator¶
qmark
und numeric
(serverseitige Bindung) unterstützen nicht die Verwendung von Bindungsvariablen mit dem IN-Operator.
Wenn Sie Bindungsvariablen mit dem IN-Operator verwenden müssen, verwenden Sie clientseitige Bindung (pyformat
oder format
).
Binden von Parametern an Variablen für Batcheinfügungen¶
In Ihrem Anwendungscode können Sie mehrere Zeilen in einen einzelnen Batch einfügen. Verwenden Sie dazu Parameter für Werte in einer INSERT-Anweisung. In der folgenden Anweisung werden z. B. Platzhalter für qmark
-Bindungen in einer INSERT-Anweisung verwendet:
insert into grocery (item, quantity) values (?, ?)
Zum Spezifizieren der einzufügenden Daten definieren Sie dann eine Variable, die eine Sequenz von Sequenzen ist (z. B. eine Liste von Tupeln):
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
Wie im obigen Beispiel gezeigt, ist jedes Element in der Liste ein Tupel, das die Spaltenwerte für eine einzufügende Zeile enthält.
Um die Bindung durchzuführen, rufen Sie die Methode executemany()
auf und übergeben die Variable als zweites Argument. Beispiel:
conn = snowflake.connector.connect( ... ) rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)] conn.cursor().executemany( "insert into grocery (item, quantity) values (?, ?)", rows_to_insert)
Durch Binden von Daten auf dem Server (d. h. mittels qmark
- oder numeric
-Bindung) kann der Konnektor die Leistung von Batcheinfügungen optimieren.
Wenn Sie dieses Verfahren verwenden, um eine große Anzahl von Werten einzufügen, kann die Treiberleistung verbessert werden, indem die Daten (ohne Erstellen von Dateien auf dem lokalen Computer) an einen temporären Stagingbereich gestreamt werden. Der Treiber führt dies automatisch durch, wenn die Anzahl der Werte einen Schwellenwert überschreitet.
Außerdem müssen die aktuelle Datenbank und das aktuelle Schema für die Sitzung festgelegt sein. Wenn diese nicht festgelegt sind, kann der vom Treiber ausgeführte CREATE TEMPORARY STAGE-Befehl folgenden Fehler generieren:
CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Bemerkung
Alternative Möglichkeiten zum Laden von Daten in die Snowflake-Datenbank (einschließlich Massenladen mit dem COPY-Befehl) finden Sie unter Daten in Snowflake laden.
Angriffe durch Einschleusung von SQL-Befehlen verhindern¶
Binden Sie Daten nicht mit der Formatierungsfunktion von Python, da Sie eine Einschleusung von SQL-Befehlen riskieren. Beispiel:
# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%(col1)d, '%(col2)s')" % { 'col1': 789, 'col2': 'test string3' })# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%d, '%s')" % ( 789, 'test string3' ))# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES({col1}, '{col2}')".format( col1=789, col2='test string3') )
Speichern Sie stattdessen die Werte in Variablen, und binden Sie diese Variablen dann mit qmark oder einem numerischen Bindungsformat ein.
Abrufen von Spaltenmetadaten¶
Für das Abrufen der Metadaten jeder Spalte im Resultset abzurufen (z. B. Name, Typ, Genauigkeit, Dezimalstellenzahl usw. jeder Spalte) gibt es folgende Möglichkeiten:
Nach dem Aufruf der Methode
execute()
zur Ausführung der Abfrage können Sie mit dem Attributdescription
desCursor
-Objekts auf die Metadaten zuzugreifen.Wenn Sie auf die Metadaten zugreifen möchten, ohne eine Abfrage ausführen zu müssen, rufen Sie die Methode
describe()
auf.Die Methode
describe
ist im Snowflake-Konnektor für Python ab Version 2.4.6 verfügbar.
Das Attribut description
wird auf einen der folgenden Werte gesetzt:
Version 2.4.5 und früher: Eine Liste von Tupeln.
Versionen 2.4.6 und später: Eine Liste von ResultMetadata-Objekten. (Die Methode
describe
gibt ebenfalls diese Liste zurück.)
Jedes Tupel- und ResultMetadata
-Objekt enthält die Metadaten zu einer Spalte (Spaltenname, Datentyp usw.). Sie können auf die Metadaten über den Index zugreifen oder ab Version 2.4.6 auch über das Attribut ResultMetadata
.
Die folgenden Beispiele demonstrieren, wie Sie auf die Metadaten der zurückgegebenen Tupel und ResultMetadata
-Objekte zugreifen können.
Beispiel: Abrufen der Spaltennamen-Metadaten nach Index (ab Version 2.4.5):
Im folgenden Beispiel wird das Attribut description
verwendet, um die Liste der Spaltennamen nach der Ausführung einer Abfrage abzurufen. Das Attribut ist eine Liste von Tupeln. Das Beispiel greift auf den Spaltennamen des ersten Wertes in jedem Tupel zu.
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col[0] for col in cur.description]))
Beispiel: Abrufen der Spaltennamen-Metadaten nach Attribut (ab Version 2.4.6):
Im folgenden Beispiel wird das Attribut description
verwendet, um die Liste der Spaltennamen nach der Ausführung einer Abfrage abzurufen. Das Attribut ist eine Liste von ResultMetaData-Objekten. Das Beispiel greift auf den Spaltennamen aus dem Attribut name
jedes ResultMetadata
-Objekts zu.
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col.name for col in cur.description]))
Beispiel: Abrufen der Spaltennamen-Metadaten ohne Ausführen der Abfrage (ab Version 2.4.6):
Das folgende Beispiel verwendet die Methode describe
, um die Liste der Spaltennamen abzurufen, ohne eine Abfrage ausführen zu müssen. Die Methode describe()
gibt eine Liste von ResultMetaData-Objekten zurück. Das Beispiel greift auf den Spaltennamen aus dem Attribut name
jedes ResultMetadata
-Objekts zu.
cur = conn.cursor() result_metadata_list = cur.describe("SELECT * FROM test_table") print(','.join([col.name for col in result_metadata_list]))
Fehlerbehandlung¶
Die Anwendung muss auf Ausnahmen, die vom Snowflake-Konnektor ausgelöst werden, korrekt reagieren und entscheiden, ob die Codeausführung fortgesetzt oder abgebrochen wird.
# Catching the syntax error cur = con.cursor() try: cur.execute("SELECT * FROM testtable") except snowflake.connector.errors.ProgrammingError as e: # default error message print(e) # customer error message print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid)) finally: cur.close()
Verwenden von execute_stream
zum Ausführen von SQL-Skripten¶
Mit der Funktion execute_stream
können Sie ein oder mehrere SQL-Skripte in einem Stream ausführen:
from codecs import open with open(sqlfile, 'r', encoding='utf-8') as f: for cur in con.execute_stream(f): for ret in cur: print(ret)
Schließen der Verbindung¶
Als Best Practice schließen Sie die Verbindung, indem Sie die Methode close
aufrufen:
connection.close()
Dadurch wird sichergestellt, dass die gesammelten Clientmetriken an den Server übermittelt werden und die Sitzung gelöscht wird. Außerdem helfen try-finally
Blöcke sicherzustellen, dass die Verbindung geschlossen wird, auch wenn mittendrin eine Ausnahme ausgelöst wird:
# Connecting to Snowflake con = snowflake.connector.connect(...) try: # Running queries con.cursor().execute(...) ... finally: # Closing the connection con.close()
Verwenden eines Kontextmanagers zum Verbinden und Steuern von Transaktionen¶
Der Snowflake-Konnektor für Python unterstützt einen Kontextmanager, der bei Bedarf Ressourcen zuweist und freigibt. Der Kontextmanager ist nützlich, um für Transaktionen auf Basis des Anweisungsstatus ein Commit oder ein Rollback auszuführen, wenn autocommit
deaktiviert ist.
# Connecting to Snowflake using the context manager with snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False, ) as con: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
Im obigen Beispiel führt der Kontextmanager nach Fehlschlagen der dritten Anweisung ein Rollback der Änderungen in der Transaktion aus und schließt die Verbindung. Wenn alle Anweisungen erfolgreich waren, würde der Kontextmanager die Änderungen committen und die Verbindung schließen.
Der äquivalente Code mit try
- und except
-Blöcken ist wie folgt:
# Connecting to Snowflake using try and except blocks con = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False) try: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail con.commit() except Exception as e: con.rollback() raise e finally: con.close()
Protokollieren¶
Der Snowflake-Konnektor für Python nutzt das logging
-Standardmodul von Python, um den Status in regelmäßigen Abständen zu protokollieren, sodass die Anwendung ihre Aktivitäten im Hintergrund verfolgen kann. Der einfachste Weg, die Protokollierung zu aktivieren, ist der Aufruf von logging.basicConfig()
am Anfang der Anwendung.
Im folgenden Beispiel wird der Protokolliergrad auf INFO
eingestellt, und die Protokolle werden in einer Datei namens /tmp/snowflake_python_connector.log
gespeichert.
logging.basicConfig( filename=file_name, level=logging.INFO)
Eine umfassendere Protokollierung kann durch Einstellen des Protokolliergrads auf DEBUG
wie folgt aktiviert werden:
# Logging including the timestamp, thread and the source code location import logging for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)Die optionale, aber empfohlene Formatierungsklasse SecretDetector stellt sicher, dass ein bestimmter Satz bekannter vertraulicher Informationen maskiert wird, bevor diese in die Protokolldateien des Python-Konnektors von Snowflake geschrieben werden. Verwenden Sie für SecretDetector folgenden Code:
# Logging including the timestamp, thread and the source code location import logging from snowflake.connector.secret_detector import SecretDetector for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)Bemerkung
botocore
undboto3
sind über das AWS (Amazon Web Services) SDK für Python verfügbar.
Beispielprogramm¶
Der folgende Beispielcode kombiniert die meisten der in den vorangegangenen Abschnitten beschriebenen Beispiele zu einem funktionierenden Python-Programm: Dieses Beispiel enthält zwei Teile:
Eine übergeordnete Klasse („python_veritas_base“) enthält den Code für viele gängige Operationen, z. B. das Herstellen einer Verbindung zum Server.
Eine untergeordnete Klasse („python_connector_example“) repräsentiert die kundenspezifischen Teile eines bestimmten Clients, z. B. zum Abfragen einer Tabelle.
Dieser Beispielcode wird direkt aus einem unserer Tests importiert, um sicherzustellen, dass er in einem neueren Build des Produkts ausgeführt wurde.
Da dies aus einem Test stammt, enthält es eine kleine Menge Code, um einen alternativen Port und ein alternatives Protokoll festzulegen, die in einigen Tests verwendet werden. Benutzer sollten Protokoll oder Portnummer nicht festlegen. Lassen Sie diese Werte stattdessen aus, und verwenden Sie die Standardeinstellungen.
Dies enthält auch einige Abschnittsmarkierungen (manchmal als „Snippet-Tags“ bezeichnet), um Code zu identifizieren, der unabhängig in die Dokumentation importiert werden kann. Abschnittsmarkierungen sehen normalerweise ähnlich aus wie:
# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
Diese Abschnittsmarkierungen sind im Benutzercode nicht erforderlich.
Der erste Teil des Codebeispiels enthält die allgemeinen Unterroutinen für:
Lesen Sie Befehlszeilenargumente (z. B. „–warehouse MyWarehouse“), die Verbindungsinformationen enthalten.
Stellen Sie eine Verbindung zum Server her.
Erstellen und verwenden Sie ein Warehouse, eine Datenbank und ein Schema.
Löschen Sie das Schema, die Datenbank und das Warehouse, wenn Sie damit fertig sind.
import logging
import os
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
class python_veritas_base:
"""
PURPOSE:
This is the Base/Parent class for programs that use the Snowflake
Connector for Python.
This class is intended primarily for:
* Sample programs, e.g. in the documentation.
* Tests.
"""
def __init__(self, p_log_file_name = None):
"""
PURPOSE:
This does any required initialization steps, which in this class is
basically just turning on logging.
"""
file_name = p_log_file_name
if file_name is None:
file_name = '/tmp/snowflake_python_connector.log'
# -- (> ---------- SECTION=begin_logging -----------------------------
logging.basicConfig(
filename=file_name,
level=logging.INFO)
# -- <) ---------- END_SECTION ---------------------------------------
# -- (> ---------------------------- SECTION=main ------------------------
def main(self, argv):
"""
PURPOSE:
Most tests follow the same basic pattern in this main() method:
* Create a connection.
* Set up, e.g. use (or create and use) the warehouse, database,
and schema.
* Run the queries (or do the other tasks, e.g. load data).
* Clean up. In this test/demo, we drop the warehouse, database,
and schema. In a customer scenario, you'd typically clean up
temporary tables, etc., but wouldn't drop your database.
* Close the connection.
"""
# Read the connection parameters (e.g. user ID) from the command line
# and environment variables, then connect to Snowflake.
connection = self.create_connection(argv)
# Set up anything we need (e.g. a separate schema for the test/demo).
self.set_up(connection)
# Do the "real work", for example, create a table, insert rows, SELECT
# from the table, etc.
self.do_the_real_work(connection)
# Clean up. In this case, we drop the temporary warehouse, database, and
# schema.
self.clean_up(connection)
print("\nClosing connection...")
# -- (> ------------------- SECTION=close_connection -----------------
connection.close()
# -- <) ---------------------------- END_SECTION ---------------------
# -- <) ---------------------------- END_SECTION=main --------------------
def args_to_properties(self, args):
"""
PURPOSE:
Read the command-line arguments and store them in a dictionary.
Command-line arguments should come in pairs, e.g.:
"--user MyUser"
INPUTS:
The command line arguments (sys.argv).
RETURNS:
Returns the dictionary.
DESIRABLE ENHANCEMENTS:
Improve error detection and handling.
"""
connection_parameters = {}
i = 1
while i < len(args) - 1:
property_name = args[i]
# Strip off the leading "--" from the tag, e.g. from "--user".
property_name = property_name[2:]
property_value = args[i + 1]
connection_parameters[property_name] = property_value
i += 2
return connection_parameters
def create_connection(self, argv):
"""
PURPOSE:
This gets account identifier and login information from the
environment variables and command-line parameters, connects to the
server, and returns the connection object.
INPUTS:
argv: This is usually sys.argv, which contains the command-line
parameters. It could be an equivalent substitute if you get
the parameter information from another source.
RETURNS:
A connection.
"""
# Get account identifier and login information from environment variables and command-line parameters.
# For information about account identifiers, see
# https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
# -- (> ----------------------- SECTION=set_login_info ---------------
# Get the password from an appropriate environment variable, if
# available.
PASSWORD = os.getenv('SNOWSQL_PWD')
# Get the other login info etc. from the command line.
if len(argv) < 11:
msg = "ERROR: Please pass the following command-line parameters:\n"
msg += "--warehouse <warehouse> --database <db> --schema <schema> "
msg += "--user <user> --account <account_identifier> "
print(msg)
sys.exit(-1)
else:
connection_parameters = self.args_to_properties(argv)
USER = connection_parameters["user"]
ACCOUNT = connection_parameters["account"]
WAREHOUSE = connection_parameters["warehouse"]
DATABASE = connection_parameters["database"]
SCHEMA = connection_parameters["schema"]
# Optional: for internal testing only.
try:
PORT = connection_parameters["port"]
except:
PORT = ""
try:
PROTOCOL = connection_parameters["protocol"]
except:
PROTOCOL = ""
# If the password is set by both command line and env var, the
# command-line value takes precedence over (is written over) the
# env var value.
# If the password wasn't set either in the environment var or on
# the command line...
if PASSWORD is None or PASSWORD == '':
print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
sys.exit(-2)
# -- <) ---------------------------- END_SECTION ---------------------
# Optional diagnostic:
#print("USER:", USER)
#print("ACCOUNT:", ACCOUNT)
#print("WAREHOUSE:", WAREHOUSE)
#print("DATABASE:", DATABASE)
#print("SCHEMA:", SCHEMA)
#print("PASSWORD:", PASSWORD)
#print("PROTOCOL:" "'" + PROTOCOL + "'")
#print("PORT:" + "'" + PORT + "'")
print("Connecting...")
# If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
# -- (> ------------------- SECTION=connect_to_snowflake ---------
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)
# -- <) ---------------------------- END_SECTION -----------------
else:
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA,
# Optional: for internal testing only.
protocol=PROTOCOL,
port=PORT
)
return conn
def set_up(self, connection):
"""
PURPOSE:
Set up to run a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.create_warehouse_database_and_schema(connection)
def do_the_real_work(self, conn):
"""
PURPOSE:
Your sub-class should override this to include the code required for
your documentation sample or your test case.
This default method does a very simple self-test that shows that the
connection was successful.
"""
# Create a cursor for this connection.
cursor1 = conn.cursor()
# This is an example of an SQL statement we might want to run.
command = "SELECT PI()"
# Run the statement.
cursor1.execute(command)
# Get the results (should be only one):
for row in cursor1:
print(row[0])
# Close this cursor.
cursor1.close()
def clean_up(self, connection):
"""
PURPOSE:
Clean up after a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.drop_warehouse_database_and_schema(connection)
def create_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Create the temporary schema, database, and warehouse that we use
for most tests/demos.
"""
# Create a database, schema, and warehouse if they don't already exist.
print("\nCreating warehouse, database, schema...")
# -- (> ------------- SECTION=create_warehouse_database_schema -------
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# -- (> --------------- SECTION=use_warehouse_database_schema --------
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
def drop_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Drop the temporary schema, database, and warehouse that we create
for most tests/demos.
"""
# -- (> ------------- SECTION=drop_warehouse_database_schema ---------
conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# ----------------------------------------------------------------------------
if __name__ == '__main__':
pvb = python_veritas_base()
pvb.main(sys.argv)
Der zweite Teil des Codebeispiels erstellt eine Tabelle, fügt Zeilen ein usw.:
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
# Import the base class that contains methods used in many tests and code
# examples.
from python_veritas_base import python_veritas_base
class python_connector_example (python_veritas_base):
"""
PURPOSE:
This is a simple example program that shows how to use the Snowflake
Python Connector to create and query a table.
"""
def __init__(self):
pass
def do_the_real_work(self, conn):
"""
INPUTS:
conn is a Connection object returned from snowflake.connector.connect().
"""
print("\nCreating table test_table...")
# -- (> ----------------------- SECTION=create_table ---------------------
conn.cursor().execute(
"CREATE OR REPLACE TABLE "
"test_table(col1 integer, col2 string)")
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")
# -- <) ---------------------------- END_SECTION -------------------------
print("\nSelecting from test_table...")
# -- (> ----------------------- SECTION=querying_data --------------------
cur = conn.cursor()
try:
cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# -- <) ---------------------------- END_SECTION -------------------------
# ============================================================================
if __name__ == '__main__':
test_case = python_connector_example()
test_case.main(sys.argv)
Gehen Sie folgendermaßen vor, um dieses Beispiel auszuführen:
Kopieren Sie den ersten Teil des Codes in eine Datei mit dem Namen „python_veritas_base.py“.
Kopieren Sie den zweiten Teil des Codes in eine Datei mit dem Namen „python_connector_example.py“.
Setzen Sie die Umgebungsvariable SNOWSQL_PWD auf Ihr Kennwort, zum Beispiel:
export SNOWSQL_PWD='MyPassword'Führen Sie das Programm über eine Befehlszeile ähnlich der folgenden aus (ersetzen Sie die Benutzer- und Kontoinformationen natürlich durch Ihre eigenen Benutzer- und Kontoinformationen).
Warnung
Dies löscht das Warehouse, die Datenbank und das Schema am Ende des Programms! Verwenden Sie nicht den Namen einer vorhandenen Datenbank, da Sie diese verlieren werden!
python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
Und hier ist die Ausgabe:
Connecting...
Creating warehouse, database, schema...
Creating table test_table...
Selecting from test_table...
123, test string1
456, test string2
Closing connection...
Hier ist ein längeres Beispiel:
Bemerkung
Achten Sie darauf, dass Sie in dem Abschnitt, in dem Sie Ihre Konto- und Anmeldeinformationen festlegen, die Variablen bei Bedarf so ersetzen, dass sie Ihren Snowflake-Anmeldeinformationen (Name, Kennwort usw.) entsprechen.
In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.
#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#
# Logging
import logging
logging.basicConfig(
filename='/tmp/snowflake_python_connector.log',
level=logging.INFO)
import snowflake.connector
# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'
import os
# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# Connecting to Snowflake
con = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
)
# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")
# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")
# Creating a table and inserting data
con.cursor().execute(
"CREATE OR REPLACE TABLE "
"testtable(col1 integer, col2 string)")
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(123, 'test string1'),(456, 'test string2')")
# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
STORAGE_INTEGRATION = myint
FILE_FORMAT=(field_delimiter=',')
""".format(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
# Querying data
cur = con.cursor()
try:
cur.execute("SELECT col1, col2 FROM testtable")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# Binding data
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%(col1)s, %(col2)s)", {
'col1': 789,
'col2': 'test string3',
})
# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))
# Catching syntax errors
cur = con.cursor()
try:
cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
# default error message
print(e)
# user error message
print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
cur.close()
# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
# Closing the connection
con.close()