Verwenden des Python-Konnektors¶
Unter diesem Thema werden zahlreiche Beispiele bereitgestellt, die veranschaulichen, wie Sie den Snowflake-Konnektor verwenden können, um Snowflake-Standardoperationen wie Benutzeranmeldung, Datenbank- und Tabellenerstellung, Warehouse-Erstellung, Einfügen/Laden von Daten sowie Abfragen durchzuführen.
Im Beispielcode am Ende dieses Themas werden einzelne Beispiele zu einem einzigen, funktionierenden Python-Programm zusammengeführt.
Unter diesem Thema:
Erstellen einer Datenbank, eines Schemas und eines Warehouse
-
Verwenden eines Kontextmanagers zum Verbinden und Steuern von Transaktionen
Überprüfen der Netzwerkverbindung zu Snowflake mit SnowCD¶
Nach der Konfiguration des Treibers können Sie die Netzwerkkonnektivität zu Snowflake mit SnowCD testen und Probleme beheben.
Sie können während der Erstkonfiguration und bei Bedarf jederzeit SnowCD verwenden, um Ihre Netzwerkverbindung zu Snowflake zu testen und Probleme zu beheben.
Verbinden mit Snowflake¶
Importieren Sie das snowflake.connector
-Modul:
import snowflake.connector
Ermitteln Sie die Anmeldeinformationen aus Umgebungsvariablen, über die Befehlszeile, aus einer Konfigurationsdatei oder einer anderen geeigneten Quelle. Beispiel:
PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...
Verwenden Sie für den Parameter ACCOUNT Ihren Kontobezeichner. Beachten Sie, dass der Kontobezeichner nicht das Suffix snowflakecomputing.com
enthält.
Weitere Details und Beispiele dazu finden Sie unter Nutzungshinweise für die Verwendung des account-Parameters (für die connect-Methode).
Bemerkung
Beschreibungen zu den verfügbaren Konnektorparametern finden Sie unter den snowflake.connector
-Methoden.
Wenn Sie Daten aus Ihrem eigenen Amazon S3-Bucket kopieren möchten, benötigen Sie die AWS_ACCESS_KEY_ID und AWS_SECRET_ACCESS_KEY.
import os AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')Bemerkung
Wenn Ihre Daten in einem Microsoft Azure-Container gespeichert sind, geben Sie die Anmeldeinformationen direkt in der COPY-Anweisung an.
Stellen Sie nach dem Lesen der Verbindungsinformationen eine Verbindung mit dem Standardauthentifikator oder der Verbundauthentifizierung her (falls aktiviert).
Einstellen der Sitzungsparameter¶
Es gibt mehrere Möglichkeiten, Sitzungsparameter festzulegen, z. B. QUERY_TAG, wenn Sie den Python-Konnektor verwenden.
Sie können Parameter auf Sitzungsebene festlegen, wenn Sie eine Verbindung zu Snowflake herstellen. Übergeben Sie dazu den optionalen Verbindungsparameter „session_parameters“ wie unten gezeigt:
con = snowflake.connector.connect(
user='XXXX',
password='XXXX',
account='XXXX',
session_parameters={
'QUERY_TAG': 'EndOfMonthFinancials',
}
)
Das an die connect()-Methode übergebene „session_parameters“-Wörterbuch kann einen oder mehrere Parameter auf Sitzungsebene enthalten.
Sie können Sitzungsparameter auch festlegen, indem Sie nach dem Herstellen der Verbindung die SQL-Anweisung ALTER SESSION SET ...
ausführen:
con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")
Weitere Informationen zu Sitzungsparametern finden Sie in den Beschreibungen der einzelnen Parameter auf der allgemeinen Seite Parameter.
Verbinden mit dem Standardauthentifikator¶
Stellen Sie eine Verbindung zu Snowflake unter Verwendung der erforderlichen Anmeldeparameter her:
conn = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, warehouse=WAREHOUSE, database=DATABASE, schema=SCHEMA )
Möglicherweise müssen Sie dies mit anderen Informationen erweitern.
Verwenden von Single Sign-On (SSO) zur Authentifizierung¶
Wenn Sie Snowflake für die Verwendung von Single Sign-On (SSO) konfiguriert haben, können Sie Ihre Clientanwendung so konfigurieren, dass SSO für die Authentifizierung verwendet wird. Weitere Informationen dazu finden Sie unter Verwenden von SSO bei Clientanwendungen, die sich mit Snowflake verbinden.
Verwenden der mehrstufige Authentifizierung (MFA)¶
Snowflake unterstützt das Caching von MFA-Token, einschließlich der Kombination der von MFA-Token-Caching mit SSO.
Weitere Informationen dazu finden Sie unter Verwenden von MFA-Token-Caching zur Minimierung der Anzahl von Eingabeaufforderungen bei der Authentifizierung – Optional.
Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation¶
Der Python-Konnektor unterstützt Schlüsselpaar-Authentifizierung und Schlüsselrotation.
Weitere Informationen zur Konfiguration der Schlüsselpaar-Authentifizierung und der Schlüsselrotation finden Sie unter Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.
Nachdem Sie die Konfiguration der Schlüsselpaar-Authentifizierung abgeschlossen haben, setzen Sie den Parameter private_key
in der Funktion connect
auf den Pfad zur Datei des privaten Schlüssels.
Ändern Sie den unten stehenden Beispielcode, und führen Sie ihn aus. Der Code entschlüsselt die Datei, die den privaten Schlüssel enthält, und übergibt sie an den Snowflake-Treiber, um eine Verbindung herzustellen:
Aktualisieren Sie die Sicherheitsparameter:
path
: Gibt den lokalen Pfad zu der von Ihnen erstellten Datei mit dem privaten Schlüssel an.Aktualisieren Sie die Verbindungsparameter:
user
: Gibt Ihren Snowflake-Anmeldenamen an.
account_identifier
: Gibt Ihren Kontobezeichner an.Weitere Details dazu finden Sie unter Nutzungshinweise für die Verwendung des account-Parameters (für die connect-Methode).
Beispielcode
import snowflake.connector import os from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open("<path>/rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) ctx = snowflake.connector.connect( user='<user>', account='<account_identifier>', private_key=pkb, warehouse=WAREHOUSE, database=DATABASE, schema=SCHEMA ) cs = ctx.cursor()
Verwenden eines Proxyservers¶
Um einen Proxyserver zu verwenden, konfigurieren Sie die folgenden Umgebungsvariablen:
HTTP_PROXY
HTTPS_PROXY
NO_PROXY
Beispiel:
- Linux oder macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80' export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
- Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80 set HTTPS_PROXY=http://username:password@proxyserver.company.com:80
Tipp
Das Sicherheitsmodell von Snowflake erlaubt keine Secure Sockets Layer (SSL)-Proxys (unter Verwendung eines HTTPS-Zertifikats). Ihr Proxyserver muss eine öffentlich zugängliche Zertifizierungsstelle (CA) verwenden, um potenzielle Sicherheitsrisiken wie einen MITM (Man In The Middle)-Angriff durch einen kompromittierten Proxy zu reduzieren.
Wenn Sie Ihren SSL-Proxy verwenden müssen, empfehlen wir Ihnen dringend, die Serverrichtlinie zu aktualisieren, um die Snowflake-Zertifikatsprüfung zu bestehen, sodass mitten in der Kommunikation kein Zertifikat geändert wird.
Optional kann NO_PROXY
verwendet werden, um den Proxy für bestimmte Arten der Kommunikation zu umgehen. Beispielsweise kann für den Zugriff auf Amazon S3 der Proxyserver durch Angabe von NO_PROXY=".amazonaws.com"
umgangen werden.
NO_PROXY
unterstützt keine Platzhalter. Jeder angegebene Wert sollte einer der folgenden Werte sein:
Das Ende eines Hostnamens (oder eines vollständigen Hostnamens), zum Beispiel:
.amazonaws.com
meineOrganisation-meinKonto.snowflakecomputing.com
Eine IP-Adresse, zum Beispiel:
192.196.1.15
Wenn mehrere Werte angegeben werden, müssen diese durch Kommas getrennt sein. Beispiel:
localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16
Verbinden mit OAuth¶
Um eine Verbindung mit OAuth herzustellen, muss die Verbindungszeichenfolge den Parameter authenticator
mit dem Wert oauth
und den Parameter token
mit dem Wert oauth_access_token
enthalten. Weitere Informationen dazu finden Sie unter Clients, Treiber und Konnektoren.
ctx = snowflake.connector.connect(
user="<username>",
host="<hostname>",
account="<account_identifier>",
authenticator="oauth",
token="<oauth_access_token>",
warehouse="test_warehouse",
database="test_db",
schema="test_schema"
)
OCSP¶
Wenn der Treiber eine Verbindung herstellt, sendet Snowflake ein Zertifikat, um zu bestätigen, dass die Verbindung zu Snowflake und nicht zu einem Host besteht, der sich als Snowflake ausgibt. Der Treiber sendet dieses Zertifikat an einen OCSP (Online Certificate Status Protocol)-Server, um zu überprüfen, ob das Zertifikat widerrufen wurde.
Wenn der Treiber den OCSP-Server nicht erreichen kann, um das Zertifikat zu überprüfen, kann beim Treiber „Fail-open“ oder „Fail-close“ auftreten.
Auswahl des Fail-open- oder Fail-close-Modus¶
Versionen des Snowflake-Konnektors für Python vor 1.8.0 verwenden standardmäßig den Fail-close-Modus. Bei Versionen ab 1.8.0 wird standardmäßig der Fail-open-Modus verwendet. Sie können das Standardverhalten überschreiben, indem Sie beim Aufrufen der connect() -Methode den optionalen Verbindungsparameter ocsp_fail_open
einstellen. Beispiel:
con = snowflake.connector.connect(
account=<account_identifier>,
user=<user>,
...,
ocsp_fail_open=False,
...);
Überprüfen der Version von OCSP-Konnektor/Treiber¶
Die Version von Treiber oder Konnektor bestimmt zusammen mit deren Konfiguration das OCSP-Verhalten. Weitere Informationen zur Treiber- oder Konnektor-Version, ihrer Konfiguration und zum OCSP-Verhalten finden Sie unter OCSP-Konfiguration.
Zwischenspeichern von OCSP-Antworten¶
Um die Sicherheit der gesamten Kommunikation zu gewährleisten, verwendet der Snowflake-Konnektor für Python das HTTPS-Protokoll für die Verbindung zu Snowflake und zu allen anderen Services (z. B. Amazon S3 für das Staging von Datendateien und Okta für die Verbundauthentifizierung). Zusätzlich zum regulären HTTPS-Protokoll überprüft der Konnektor bei jeder Verbindung über OCSP (Online Certificate Status Protocol) auch den Status der TLS/SSL-Zertifikatssperre und bricht die Verbindung ab, wenn er feststellt, dass das Zertifikat gesperrt wurde oder der OCSP-Status nicht zuverlässig ist.
Da jede Snowflake-Verbindung bis zu drei Roundtrips mit dem OCSP-Server auslöst, wurden mehrere Ebenen des Cache für OCSP-Antworten eingeführt, um die zusätzliche Netzwerklast der Verbindung zu reduzieren:
Speichercache, der während der gesamten Lebensdauer des Prozesses beibehalten wird.
Dateicache, der so lange beibehalten wird, bis das Cacheverzeichnis (z. B.
~/.cache/snowflake
) gelöscht ist.OCSP-Antwort-Servercache.
Das Caching behebt auch Verfügbarkeitsprobleme für OCSP-Server (d. h. für den Fall, dass der eigentliche OCSP-Server ausfällt). Solange der Cache gültig ist, kann der Konnektor den Status der Zertifikatssperre noch überprüfen.
Wenn keine der Cacheschichten die OCSP-Antwort enthält, versucht der Client, den Validierungsstatus direkt vom OCSP-Server der CA abzurufen.
Ändern des Speicherortes für den OCSP-Antwort-Dateicache.¶
Standardmäßig ist der Dateicache an den folgenden Speicherorten aktiviert, sodass keine zusätzlichen Konfigurationsaufgaben erforderlich sind:
- Linux
~/.cache/snowflake/ocsp_response_cache.json
- macOS
~/Library/Caches/Snowflake/ocsp_response_cache.json
- Windows
%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json
Wenn Sie jedoch einen anderen Speicherort und/oder Dateinamen für die OCSP-Antwort-Cachedatei angeben möchten, akzeptiert die connect
-Methode den Parameter ocsp_response_cache_filename
, der Pfad und Namen der OCSP-Cachedatei in Form einer URI angibt.
OCSP-Antwort-Cacheserver¶
Bemerkung
Der OCSP-Antwort-Cacheserver wird derzeit vom Snowflake-Konnektor für Python 1.6.0 und höher unterstützt.
Die Speicher- und Dateitypen des OCSP-Cache eignen sich gut für Anwendungen, die mit Snowflake über einen der von uns bereitgestellten Clients mit einem persistenten Host verbunden sind. Sie funktionieren jedoch nicht in dynamisch bereitgestellten Umgebungen wie AWS Lambda oder Docker.
Um dieser Situation zu begegnen, bietet Snowflake eine dritte Schicht des Caching: den OCSP-Antwort-Cacheserver. Der OCSP-Antwort-Cacheserver holt OCSP-Antworten stündlich von den OCSP-Servern des CA und speichert sie 24 Stunden lang. Clients können dann den Validierungsstatus eines bestimmten Snowflake-Zertifikats von diesem Servercache anfordern.
Wichtig
Wenn Ihre Serverrichtlinie den Zugriff auf die meisten oder alle externen IP-Adressen und Websites verweigert, müssen Sie die Adresse des Cacheservers auf die Zulassungsliste setzen, um einen normalen Servicebetrieb zu ermöglichen. Die Cacheserver-URL ist ocsp*.snowflakecomputing.com:80
.
Wenn Sie den Cacheserver aus irgendeinem Grund deaktivieren müssen, setzen Sie die Umgebungsvariable SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED
auf false
. Beachten Sie, dass der Wert zwischen Groß- und Kleinschreibung unterscheidet und in Kleinbuchstaben angegeben werden muss.
Erstellen einer Datenbank, eines Schemas und eines Warehouse¶
Erstellen Sie nach der Anmeldung mit den Befehlen CREATE DATABASE, CREATE SCHEMA und CREATE WAREHOUSE eine Datenbank, ein Schema und ein Warehouse, falls diese noch nicht vorhanden sind.
Das folgende Beispiel zeigt, wie sich ein Warehouse mit dem Namen tiny_warehouse
, eine Datenbank mit dem Namen testdb
und ein Schema mit dem Namen testschema
erstellen lassen. Beachten Sie, dass Sie beim Erstellen des Schemas entweder den Namen der Datenbank angeben müssen, in der das Schema erstellt werden soll, oder bereits mit der Datenbank verbunden sein müssen, in der das Schema erstellt werden soll. Im folgenden Beispiel wird vor dem Befehl CREATE SCHEMA
ein USE DATABASE
-Befehl ausgeführt, um sicherzustellen, dass das Schema in der korrekten Datenbank erstellt wird.
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg") conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Verwenden von Datenbank, Schema und Warehouse¶
Geben Sie Datenbank und Schema an, in denen Sie Tabellen erstellen möchten. Geben Sie auch das Warehouse an, das Ressourcen für die Ausführung von DML-Anweisungen und -Abfragen bereitstellen soll.
Folgendes Beispiel zeigt die Verwendung von Datenbank testdb
, Schema testschema
und Warehouse tiny_warehouse
(zuvor erstellt):
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg") conn.cursor().execute("USE DATABASE testdb_mg") conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Erstellen von Tabellen und Einfügen von Daten¶
Verwenden Sie den Befehl CREATE TABLE, um Tabellen zu erstellen, und den Befehl INSERT, um die Tabellen mit Daten zu füllen.
Erstellen Sie beispielsweise eine Tabelle mit dem Namen testtable
, und fügen Sie zwei Zeilen in die Tabelle ein:
conn.cursor().execute( "CREATE OR REPLACE TABLE " "test_table(col1 integer, col2 string)") conn.cursor().execute( "INSERT INTO test_table(col1, col2) VALUES " + " (123, 'test string1'), " + " (456, 'test string2')")
Laden von Daten¶
Anstatt Daten mit einzelnen INSERT-Befehlen in Tabellen einzufügen, können Sie Daten aus Dateien, die entweder an einem internen oder externen Speicherort bereitgestellt werden, per Massenladen hinzufügen.
Kopieren von Daten von einem internen Speicherort¶
Um Daten aus Dateien auf Ihrem Hostcomputer in eine Tabelle zu laden, verwenden Sie zunächst den Befehl PUT für das Staging der Datei an einem internen Speicherort. Verwenden Sie dann den Befehl COPY INTO <Tabelle>, um die Daten aus den Dateien in die Tabelle zu kopieren.
Beispiel:
# Putting Data con.cursor().execute("PUT file:///tmp/data/file* @%testtable") con.cursor().execute("COPY INTO testtable")Hier werden die CSV-Daten in einem lokalen Verzeichnis namens
/tmp/data
in einer Linux- oder macOS-Umgebung gespeichert. Das Verzeichnis enthält dann Dateien mit den Namenfile0
,file1
, …file100
.
Kopieren von Daten von einem externen Speicherort¶
Um Daten aus Dateien, die bereits in einem externen Stagingbereich (z. B. Ihr S3-Bucket) bereitgestellt wurden, in eine Tabelle zu laden, verwenden Sie den Befehl COPY INTO <Tabelle>.
Beispiel:
# Copying Data con.cursor().execute(""" COPY INTO testtable FROM s3://<s3_bucket>/data/ STORAGE_INTEGRATION = myint FILE_FORMAT=(field_delimiter=',') """.format( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY))Wobei:
s3://<S3-Bucket>/data/
gibt den Namen Ihres S3-Buckets an.Den Dateien im Bucket wird
data
vorangestellt.Der Zugriff auf den Bucket erfolgt über eine Speicherintegration, die mit CREATE STORAGE INTEGRATION von einem Kontoadministrator (d. h. einem Benutzer mit der Rolle ACCOUNTADMIN) oder einer Rolle mit der globalen Berechtigung CREATE INTEGRATION erstellt wurde. Bei Verwendung einer Speicherintegration benötigen Benutzer für den Zugriff auf einen privaten Speicherort keine Anmeldeinformationen mehr.
Bemerkung
In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.
Abfragen von Daten¶
Mit dem Snowflake-Konnektor für Python können Sie Folgendes übermitteln:
Eine synchrone Abfrage, die die Kontrolle an Ihre Anwendung zurückgibt, nachdem die Abfrage abgeschlossen ist.
Eine asynchrone Abfrage, die die Kontrolle an Ihre Anwendung zurückgibt, bevor die Abfrage abgeschlossen ist.
Nachdem die Abfrage abgeschlossen wurde, verwenden Sie das Cursor
-Objekt, um die Werte in den Ergebnissen abzurufen. Standardmäßig konvertiert der Snowflake-Konnektor für Python die Werte von Snowflake-Datentypen in native Python-Datentypen. (Beachten Sie, dass Sie die Werte auch als Zeichenfolgen zurückgeben und die Typkonvertierungen in Ihrer Anwendung durchführen können. Siehe Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung.)
Bemerkung
Standardmäßig werden die Werte aus NUMBER-Spalten als Gleitkommawerten mit doppelter Genauigkeit (float64
) zurückgegeben. Um diese als Dezimalwerte (decimal.Decimal
) in den Methoden fetch_pandas_all()
und fetch_pandas_batches()
zurückzugeben, setzen Sie den Parameter arrow_number_to_decimal
in der Methode connect()
auf True
.
Ausführen einer synchronen Abfrage¶
Um eine synchrone Abfrage auszuführen, rufen Sie die Methode execute()
im Cursor
-Objekt auf. Beispiel:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Verwenden Sie das Cursor
-Objekt, um die Werte in den Ergebnissen abzurufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.
Ausführen einer asynchronen Abfrage¶
Der Snowflake-Konnektor für Python unterstützt asynchrone Abfragen (d. h. Abfragen, die dem Benutzer die Kontrolle zurückgeben, bevor die Abfrage abgeschlossen ist). Sie können eine asynchrone Abfrage übermitteln und mithilfe von Abrufen (Polling) feststellen, wann die Abfrage abgeschlossen ist. Nachdem die Abfrage abgeschlossen ist, können Sie die Ergebnisse abrufen.
Bemerkung
Um asynchrone Abfragen auszuführen, müssen Sie sicherstellen, dass der Konfigurationsparameter ABORT_DETACHED_QUERY
den Wert FALSE
hat (Standardwert).
Snowflake schließt Verbindungen automatisch nach einer bestimmten Zeit (Standard: 5 Minuten), wodurch alle aktiven Abfragen verwaist sind. Wenn der Wert TRUE
ist, bricht Snowflake diese verwaisten Abfragen ab, was sich auf asynchrone Abfragen auswirken kann.
Mit dieser Funktion können Sie mehrere Abfragen parallel übermitteln, ohne auf den Abschluss jeder Abfrage warten zu müssen. Sie können innerhalb derselben Sitzung auch Kombinationen von synchronen und asynchronen Abfragen ausführen.
Schließlich können Sie eine asynchrone Abfrage über eine Verbindung übermitteln und die Ergebnisse über eine andere Verbindung abrufen. Beispielsweise kann ein Benutzer eine Abfrage mit langer Laufzeit über Ihre Anwendung initiieren, die Anwendung beenden und die Anwendung zu einem späteren Zeitpunkt erneut starten, um die Ergebnisse zu überprüfen.
Übermitteln einer asynchronen Abfrage¶
Um eine asynchrone Abfrage zu übermitteln, rufen Sie die Methode execute_async()
im Cursor
-Objekt auf. Beispiel:
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Nach dem Übermitteln der Abfrage:
Um festzustellen, ob die Abfrage noch ausgeführt wird, siehe Überprüfen des Status einer Abfrage.
Um die Ergebnisse der Abfrage abzurufen, siehe Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen.
Beispiele für das Ausführen asynchroner Abfragen finden Sie unter Beispiele für asynchrone Abfragen.
Best Practices für asynchrone Abfragen¶
Beachten Sie beim Übermitteln von asynchronen Abfrage die folgenden bewährten Verfahren:
Stellen Sie vor der parallelen Ausführung von Abfragen sicher, dass Sie die Abhängigkeiten der Abfragen von anderen Abfragen genau kennen. Einige Abfragen sind voneinander und von der Ausführungsreihenfolge abhängig und sind daher nicht für die Parallelisierung geeignet. Beispielsweise kann eine INSERT-Anweisung offensichtlich erst beginnen, nachdem die entsprechende CREATE TABLE-Anweisung abgeschlossen wurde.
Stellen Sie sicher, dass die Anzahl der gestarteten Abfragen auf den verfügbaren Arbeitsspeicher abgestimmt ist. Das parallele Ausführen mehrerer Abfragen beansprucht normalerweise mehr Speicher, insbesondere wenn mehrere Ergebnis-Datasets gleichzeitig im Arbeitsspeicher gespeichert sind.
Behandeln Sie beim Abrufen (Polling) die seltenen Fälle, in denen eine Abfrage nicht erfolgreich ist.
Stellen Sie sicher, dass Anweisungen zur Transaktionssteuerung (BEGIN, COMMIT und ROLLBACK) nicht parallel zu anderen Anweisungen ausgeführt werden.
Abrufen der Snowflake-Abfrage-ID¶
Eine Abfrage-ID kennzeichnet jede von Snowflake ausgeführte Abfrage. Wenn Sie den Snowflake-Konnektor für Python zum Ausführen einer Abfrage verwenden, können Sie auf die Abfrage-ID über das Attribut sfqid
im Cursor
-Objekt zugreifen:
# Retrieving a Snowflake Query ID cur = con.cursor() cur.execute("SELECT * FROM testtable") print(cur.sfqid)
Sie können die Abfrage-ID für Folgendes verwenden:
Überprüfen des Status von Abfragen über die Weboberfläche.
Auf der klassischen Weboberfläche werden die Abfrage-IDs auf der Seite History
angezeigt. Siehe Verwenden der Verlaufsseite zum Überwachen von Abfragen.
Programmgesteuertes Prüfen des Status einer Abfrage (z. B. um festzustellen, ob eine asynchrone Abfrage abgeschlossen ist).
Abrufen der Ergebnisse einer asynchronen Abfrage oder einer zuvor übermittelten synchronen Abfrage.
Siehe Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen.
Abbrechen einer in Ausführung befindlichen Abfrage.
Überprüfen des Status einer Abfrage¶
So überprüfen Sie den Status einer Abfrage:
Rufen Sie die Abfrage-ID aus dem Feld
sfqid
im ObjektCursor
ab.Übergeben Sie die Abfrage-ID an die Methode
get_query_status()
desConnection
-Objekts, um die EnumerationskonstanteQueryStatus
zurückzugeben, die den Status der Abfrage darstellt.Standardmäßig löst
get_query_status()
keinen Fehler aus, wenn die Abfrage zu einem Fehler geführt hat. Wenn Sie möchten, dass ein Fehler ausgelöst wird, rufen Sie stattdessenget_query_status_throw_if_error()
auf.Verwenden Sie die Enumerationskonstante
QueryStatus
, um den Status der Abfrage zu überprüfen.Um festzustellen, ob die Abfrage noch ausgeführt wird (z. B. wenn es sich um eine asynchrone Abfrage handelt), übergeben Sie die Konstante an die Methode
is_still_running()
desConnection
-Objekts.Um festzustellen, ob ein Fehler aufgetreten ist, übergeben Sie die Konstante an die Methode
is_an_error()
.
Eine vollständige Liste der Enumerationskonstanten erhalten Sie mit
QueryStatus
.
Im folgenden Beispiel wird eine asynchrone Abfrage ausgeführt und der Status der Abfrage überprüft:
import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
time.sleep(1)
Im folgenden Beispiel wird ein Fehler ausgelöst, wenn die Abfrage zu einem Fehler geführt hat:
from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
time.sleep(1)
except ProgrammingError as err:
print('Programming Error: {0}'.format(err))
Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen¶
Bemerkung
Wenn Sie eine synchrone Abfrage ausgeführt haben, indem Sie die Methode execute()
für ein Cursor
-Objekt aufgerufen haben, ist die Abfrage-ID zum Abrufen der Ergebnisse nicht erforderlich. Sie können einfach die Werte aus den Ergebnissen abrufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.
Wenn Sie die Ergebnisse einer asynchronen Abfrage oder einer zuvor übermittelten synchronen Abfrage abrufen möchten, führen Sie die folgenden Schritte aus:
Rufen Sie die Abfrage-ID der Abfrage ab. Siehe Abrufen der Snowflake-Abfrage-ID.
Rufen Sie die Methode
get_results_from_sfqid()
imCursor
-Objekt auf, um die Ergebnisse abzurufen.Verwenden Sie das
Cursor
-Objekt, um die Werte in den Ergebnissen abzurufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.
Falls die Abfrage noch ausgeführt wird, müssen Sie beachten, dass die Fetch-Methoden (fetchone()
, fetchmany()
, fetchall()
usw.) auf den Abschluss der Abfrage warten.
Beispiel:
# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Verwenden von cursor
zum Abrufen von Werten¶
Rufen Sie Werte aus einer Tabelle mit der Cursorobjektiterator-Methode ab.
Um beispielsweise Spalten mit dem Namen „col1“ und „col2“ aus der zuvor mit testtable
erstellten Tabelle (siehe Erstellen von Tabellen und Einfügen von Daten) abzurufen, verwenden Sie Code, der dem folgenden ähnelt:
cur = conn.cursor() try: cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1") for (col1, col2) in cur: print('{0}, {1}'.format(col1, col2)) finally: cur.close()
Alternativ bietet der Snowflake-Konnektor für Python ein abgekürztes Verfahren:
for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"): print('{0}, {1}'.format(col1, col2))
Wenn Sie ein einzelnes Ergebnis (d. h. eine einzelnen Zeile) erhalten möchten, verwenden Sie die Methode fetchone
:
col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone() print('{0}, {1}'.format(col1, col2))
Wenn Sie die angegebene Anzahl von Zeilen auf einmal erhalten möchten, verwenden Sie die Methode fetchmany
mit der Anzahl der Zeilen:
cur = con.cursor().execute("SELECT col1, col2 FROM testtable") ret = cur.fetchmany(3) print(ret) while len(ret) > 0: ret = cur.fetchmany(3) print(ret)Bemerkung
Verwenden Sie
fetchone
oderfetchmany
, wenn das Resultset zu groß für den verfügbaren Speicher ist.
Wenn Sie alle Ergebnisse auf einmal erhalten müssen:
results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall() for rec in results: print('%s, %s' % (rec[0], rec[1]))
Um ein Zeitlimit (Timeout) für eine Abfrage festzulegen, führen Sie einen „begin“-Befehl aus, und fügen Sie einen Timeout-Parameter in die Abfrage ein. Wenn die Abfrage die Zeitdauer des Parameterwertes überschreitet, wird ein Fehler generiert und ein Rollback durchgeführt.
Im folgenden Code bedeutet der Fehler 604, dass die Abfrage abgebrochen wurde. Der Timeout-Parameter startet Timer()
und bricht ab, wenn die Abfrage nicht innerhalb der angegebenen Zeit beendet wird.
conn.cursor().execute("create or replace table testtbl(a int, b string)") conn.cursor().execute("begin") try: conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query except ProgrammingError as e: if e.errno == 604: print("timeout") conn.cursor().execute("rollback") else: raise e else: conn.cursor().execute("commit")
Verwenden von DictCursor
zum Abrufen von Werten nach Spaltenname¶
Wenn Sie einen Wert anhand des Spaltennamens abrufen möchten, erstellen Sie ein cursor
-Objekt vom Typ DictCursor
.
Beispiel:
# Querying data by DictCursor from snowflake.connector import DictCursor cur = con.cursor(DictCursor) try: cur.execute("SELECT col1, col2 FROM testtable") for rec in cur: print('{0}, {1}'.format(rec['COL1'], rec['COL2'])) finally: cur.close()
Beispiele für asynchrone Abfragen¶
Im Folgenden finden Sie ein einfaches Beispiel für eine asynchrone Abfrage:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Im nächsten Beispiel wird eine asynchrone Abfrage von einer Verbindung aus übermittelt und das Ergebnis von einer anderen Verbindung aus abgerufen:
from snowflake.connector import ProgrammingError
import time
conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
# Get the query ID for the asynchronous query.
query_id = cur.sfqid
# Close the cursor and the connection.
cur.close()
conn.close()
# Open a new connection.
new_conn = snowflake.connector.connect( ... )
# Create a new cursor.
new_cur = new_conn.cursor()
# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Verwenden der Abfrage-ID zum Abbrechen einer Abfrage¶
So brechen Sie eine Abfrage über die Abfrage-ID ab:
cur = cn.cursor() try: cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')") result = cur.fetchall() print(len(result)) print(result[0]) finally: cur.close()
Ersetzen Sie die Zeichenfolge „queryID“ durch die tatsächliche Abfrage-ID. Um die ID für eine Abfrage zu erhalten, siehe Abrufen der Snowflake-Abfrage-ID.
Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung¶
Verwenden Sie zur Verbesserung der Abfrageleistung die Klasse SnowflakeNoConverterToPython
im Modul snowflake.connector.converter_null
, um die Datenkonvertierung vom internen Snowflake-Datentyp in den systemeigenen Python-Datentyp zu umgehen, z. B.:
from snowflake.connector.converter_null import SnowflakeNoConverterToPython con = snowflake.connector.connect( ... converter_class=SnowflakeNoConverterToPython ) for rec in con.cursor().execute("SELECT * FROM large_table"): # rec includes raw Snowflake data
Im Ergebnis werden alle Daten als Zeichenfolgen dargestellt, sodass die Anwendung für die Konvertierung in die systemeigenen Python-Datentypen verantwortlich ist. Beispielsweise sind TIMESTAMP_NTZ
- und TIMESTAMP_LTZ
-Daten die als Zeichenfolgen dargestellte Epochenzeit, und TIMESTAMP_TZ
-Daten sind die Epochenzeit, gefolgt von einem Leerzeichen, gefolgt vom Offset zu UTC in Minuten, dargestellt als Zeichenfolge.
Das Binden der Daten wird nicht beeinflusst. Python-native Daten können weiterhin für Updates gebunden werden.
Binden von Daten¶
Um Werte anzugeben, die in einer SQL-Anweisung verwendet werden sollen, können Sie Literale in die Anweisung einschließen oder Variablen binden. Wenn Sie Variablen binden, fügen Sie einen oder mehrere Platzhalter in den Text der SQL-Anweisung ein, und geben Sie dann für jeden Platzhalter die Variable (den zu verwendenden Wert) an.
Im folgenden Beispiel wird die Verwendung von Literalen und die Bindung gegenübergestellt:
Literale:
con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")Bindung:
con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
Bemerkung
Es gibt eine Obergrenze für die Datengröße, die Sie binden oder in einem Batch kombinieren können. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.
Snowflake unterstützt die folgenden Bindungstypen:
pyformat
undformat
, die Daten auf dem Client binden.qmark
undnumeric
, die Daten auf dem Server binden.
Jeder Typ wird unten erklärt.
pyformat
- oder format
-Bindung¶
Sowohl die pyformat
-Bindung als auch die format
-Bindung binden Daten auf der Clientseite und nicht auf der Serverseite.
Standardmäßig unterstützt der Snowflake-Konnektor für Python sowohl pyformat
als auch format
, sodass Sie %(name)s
oder %s
als Platzhalter verwenden können. Beispiel:
Verwenden von
%(name)s
als Platzhalter:conn.cursor().execute( "INSERT INTO test_table(col1, col2) " "VALUES(%(col1)s, %(col2)s)", { 'col1': 789, 'col2': 'test string3', })
Verwenden von
%s
als Platzhalter:con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%s, %s)", ( 789, 'test string3' ))
Mit pyformat
und format
können Sie auch ein Listenobjekt verwenden, um Daten für den IN-Operator zu binden:
# Binding data for IN operator con.cursor().execute( "SELECT col1, col2 FROM testtable" " WHERE col2 IN (%s)", ( ['test string1', 'test string3'], ))
Das Prozentzeichen („%“) ist sowohl ein Platzhalterzeichen für SQL LIKE als auch ein Formatbindungszeichen für Python. Wenn Sie die Formatbindung verwenden und Ihr SQL-Befehl das Prozentzeichen enthält, müssen Sie möglicherweise das Prozentzeichen maskieren. Wenn Ihre SQL-Anweisung beispielsweise wie folgt lautet:
SELECT col1, col2 FROM test_table WHERE col2 ILIKE '%York' LIMIT 1; -- Find York, New York, etc.
Dann sollte Ihr Python-Code wie folgt aussehen (beachten Sie das zusätzliche Prozentzeichen, um das ursprüngliche Prozentzeichen zu umgehen):
sql_command = "select col1, col2 from test_table " sql_command += " where col2 like '%%York' limit %(lim)s" parameter_dictionary = {'lim': 1 } cur.execute(sql_command, parameter_dictionary)
qmark
- oder numeric
-Bindung¶
Sowohl die qmark
-Bindung als auch die numeric
-Bindung binden Daten eher auf der Serverseite als auf der Clientseite:
Verwenden Sie für die
qmark
-Bindung ein Fragezeichenzeichen (?
), um anzugeben, an welcher Stelle in der Zeichenfolge der Wert einer Variablen eingefügt werden soll.Verwenden Sie für die
numeric
-Bindung einen Doppelpunkt (:
), gefolgt von einer Zahl, mit der die Position der Variablen angegeben wird, die an dieser Position ersetzt werden soll. So gibt beispielsweise:2
die zweite Variable an.Verwenden Sie die numerische Bindung, um denselben Wert mehr als einmal in derselben Abfrage zu binden. Wenn Sie z. B. einen langen VARCHAR- oder BINARY- oder semistrukturierten Wert haben, den Sie mehr als einmal verwenden möchten, dann können Sie mit der
numeric
-Bindung den Wert einmal an den Server senden und ihn mehrfach verwenden.
In den nächsten Abschnitten wird erklärt, wie Sie qmark
- und numeric
-Bindung verwenden:
Verwenden von qmark
- oder numeric
-Bindung¶
Um die Stilbindung qmark
oder numeric
zu verwenden, führen Sie einen der folgenden Schritte aus:
snowflake.connector.paramstyle='qmark'
snowflake.connector.paramstyle='numeric'
Wichtig
Sie müssen das paramstyle
-Attribut festlegen, bevor Sie die connect()
-Methode aufrufen.
Wenn Sie paramstyle
auf qmark
oder numeric
setzen, müssen Sie ?
oder :N
(wobei N
durch eine Zahl ersetzt wird) als Platzhalter verwenden.
Beispiel:
Verwenden von
?
als Platzhalter:import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(?, ?)", ( 789, 'test string3' ))
Verwenden von
:N
als Platzhalter:import snowflake.connector snowflake.connector.paramstyle='numeric' con = snowflake.connector.connect(...) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(:1, :2)", ( 789, 'test string3' ))
Die folgende Abfrage zeigt die Verwendung der
numeric
-Bindung zur Wiederverwendung einer Variablen:con.cursor().execute( "INSERT INTO testtable(complete_video, short_sample_of_video) " "VALUES(:1, SUBSTRING(:1, :2, :3))", ( binary_value_that_stores_video, # variable :1 starting_offset_in_bytes_of_video_clip, # variable :2 length_in_bytes_of_video_clip # variable :3 ))
Verwenden von qmark
- oder numeric
-Bindung mit datetime
-Objekten¶
Wenn Sie die Bindung qmark
oder numeric
verwenden, um Daten an einen Snowflake-Datentyp TIMESTAMP zu binden, setzen Sie die Bindungsvariable auf ein Tupel, das den Datentyp des Snowflake-Zeitstempels (TIMESTAMP_LTZ
oder TIMESTAMP_TZ
) und den Wert angibt. Beispiel:
import snowflake.connector snowflake.connector.paramstyle='qmark' con = snowflake.connector.connect(...) con.cursor().execute( "CREATE OR REPLACE TABLE testtable2 (" " col1 int, " " col2 string, " " col3 timestamp_ltz" ")" ) con.cursor().execute( "INSERT INTO testtable2(col1,col2,col3) " "VALUES(?,?,?)", ( 987, 'test string4', ("TIMESTAMP_LTZ", datetime.now()) ) )
Im Gegensatz zur clientseitigen Bindung benötigt die serverseitige Bindung den Snowflake-Datentyp für die Spalte. Die meisten gängigen Python-Datentypen weisen bereits implizite Zuordnungen zu Snowflake-Datentypen auf (z. B. int
ist FIXED
zugeordnet). Da die Python-datetime
-Daten jedoch an einen von mehreren Snowflake-Datentypen (TIMESTAMP_NTZ
, TIMESTAMP_LTZ
oder TIMESTAMP_TZ
) gebunden sein können und die Standardzuordnung TIMESTAMP_NTZ
ist, müssen Sie den zu verwendenden Snowflake-Datentyp angeben.
Verwenden von Bindungsvariablen mit dem IN-Operator¶
qmark
und numeric
(serverseitige Bindung) unterstützen nicht die Verwendung von Bindungsvariablen mit dem IN-Operator.
Wenn Sie Bindungsvariablen mit dem IN-Operator verwenden müssen, verwenden Sie clientseitige Bindung (pyformat
oder format
).
Binden von Parametern an Variablen für Batcheinfügungen¶
In Ihrem Anwendungscode können Sie mehrere Zeilen in einen einzelnen Batch einfügen. Verwenden Sie dazu Parameter für Werte in einer INSERT-Anweisung. In der folgenden Anweisung werden z. B. Platzhalter für qmark
-Bindungen in einer INSERT-Anweisung verwendet:
insert into grocery (item, quantity) values (?, ?)
Zum Spezifizieren der einzufügenden Daten definieren Sie dann eine Variable, die eine Sequenz von Sequenzen ist (z. B. eine Liste von Tupeln):
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
Wie im obigen Beispiel gezeigt, ist jedes Element in der Liste ein Tupel, das die Spaltenwerte für eine einzufügende Zeile enthält.
Um die Bindung durchzuführen, rufen Sie die Methode executemany()
auf und übergeben die Variable als zweites Argument. Beispiel:
conn = snowflake.connector.connect( ... ) rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)] conn.cursor().executemany( "insert into grocery (item, quantity) values (?, ?)", rows_to_insert)
Durch Binden von Daten auf dem Server (d. h. mittels qmark
- oder numeric
-Bindung) kann der Konnektor die Leistung von Batcheinfügungen optimieren.
Wenn Sie dieses Verfahren verwenden, um eine große Anzahl von Werten einzufügen, kann die Treiberleistung verbessert werden, indem die Daten (ohne Erstellen von Dateien auf dem lokalen Computer) an einen temporären Stagingbereich gestreamt werden. Der Treiber führt dies automatisch durch, wenn die Anzahl der Werte einen Schwellenwert überschreitet.
Außerdem müssen die aktuelle Datenbank und das aktuelle Schema für die Sitzung festgelegt sein. Wenn diese nicht festgelegt sind, kann der vom Treiber ausgeführte CREATE TEMPORARY STAGE-Befehl folgenden Fehler generieren:
CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Bemerkung
Alternative Möglichkeiten zum Laden von Daten in die Snowflake-Datenbank (einschließlich Massenladen mit dem COPY-Befehl) finden Sie unter Laden von Daten in Snowflake.
Angriffe durch Einschleusung von SQL-Befehlen verhindern¶
Binden Sie Daten nicht mit der Formatierungsfunktion von Python, da Sie eine Einschleusung von SQL-Befehlen riskieren. Beispiel:
# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%(col1)d, '%(col2)s')" % { 'col1': 789, 'col2': 'test string3' })# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES(%d, '%s')" % ( 789, 'test string3' ))# Binding data (UNSAFE EXAMPLE) con.cursor().execute( "INSERT INTO testtable(col1, col2) " "VALUES({col1}, '{col2}')".format( col1=789, col2='test string3') )
Speichern Sie stattdessen die Werte in Variablen, und binden Sie diese Variablen dann mit qmark oder einem numerischen Bindungsformat ein.
Abrufen von Spaltenmetadaten¶
Für das Abrufen der Metadaten jeder Spalte im Resultset abzurufen (z. B. Name, Typ, Genauigkeit, Skala usw. jeder Spalte) gibt es folgende Möglichkeiten:
Nach dem Aufruf der Methode
execute()
zur Ausführung der Abfrage können Sie mit dem Attributdescription
desCursor
-Objekts auf die Metadaten zuzugreifen.Wenn Sie auf die Metadaten zugreifen möchten, ohne eine Abfrage ausführen zu müssen, rufen Sie die Methode
describe()
auf.Die Methode
describe
ist im Snowflake-Konnektor für Python ab Version 2.4.6 verfügbar.
Das Attribut description
wird auf einen der folgenden Werte gesetzt:
Version 2.4.5 und früher: Eine Liste von Tupeln.
Versionen 2.4.6 und später: Eine Liste von ResultMetadata-Objekten. (Die Methode
describe
gibt ebenfalls diese Liste zurück.)
Jedes Tupel- und ResultMetadata
-Objekt enthält die Metadaten zu einer Spalte (Spaltenname, Datentyp usw.). Sie können auf die Metadaten über den Index zugreifen oder ab Version 2.4.6 auch über das Attribut ResultMetadata
.
Die folgenden Beispiele demonstrieren, wie Sie auf die Metadaten der zurückgegebenen Tupel und ResultMetadata
-Objekte zugreifen können.
Beispiel: Abrufen der Spaltennamen-Metadaten nach Index (ab Version 2.4.5):
Im folgenden Beispiel wird das Attribut description
verwendet, um die Liste der Spaltennamen nach der Ausführung einer Abfrage abzurufen. Das Attribut ist eine Liste von Tupeln. Das Beispiel greift auf den Spaltennamen des ersten Wertes in jedem Tupel zu.
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col[0] for col in cur.description]))
Beispiel: Abrufen der Spaltennamen-Metadaten nach Attribut (ab Version 2.4.6):
Im folgenden Beispiel wird das Attribut description
verwendet, um die Liste der Spaltennamen nach der Ausführung einer Abfrage abzurufen. Das Attribut ist eine Liste von ResultMetaData-Objekten. Das Beispiel greift auf den Spaltennamen aus dem Attribut name
jedes ResultMetadata
-Objekts zu.
cur = conn.cursor() cur.execute("SELECT * FROM test_table") print(','.join([col.name for col in cur.description]))
Beispiel: Abrufen der Spaltennamen-Metadaten ohne Ausführen der Abfrage (ab Version 2.4.6):
Das folgende Beispiel verwendet die Methode describe
, um die Liste der Spaltennamen abzurufen, ohne eine Abfrage ausführen zu müssen. Die Methode describe()
gibt eine Liste von ResultMetaData-Objekten zurück. Das Beispiel greift auf den Spaltennamen aus dem Attribut name
jedes ResultMetadata
-Objekts zu.
cur = conn.cursor() result_metadata_list = cur.describe("SELECT * FROM test_table") print(','.join([col.name for col in result_metadata_list]))
Fehlerbehandlung¶
Die Anwendung muss auf Ausnahmen, die vom Snowflake-Konnektor ausgelöst werden, korrekt reagieren und entscheiden, ob die Codeausführung fortgesetzt oder abgebrochen wird.
# Catching the syntax error cur = con.cursor() try: cur.execute("SELECT * FROM testtable") except snowflake.connector.errors.ProgrammingError as e: # default error message print(e) # customer error message print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid)) finally: cur.close()
Verwenden von execute_stream
zum Ausführen von SQL-Skripten¶
Mit der Funktion execute_stream
können Sie ein oder mehrere SQL-Skripte in einem Stream ausführen:
from codecs import open with open(sqlfile, 'r', encoding='utf-8') as f: for cur in con.execute_stream(f): for ret in cur: print(ret)
Schließen der Verbindung¶
Als Best Practice schließen Sie die Verbindung, indem Sie die Methode close
aufrufen:
connection.close()
Dadurch wird sichergestellt, dass die gesammelten Clientmetriken an den Server übermittelt werden und die Sitzung gelöscht wird. Außerdem helfen try-finally
Blöcke sicherzustellen, dass die Verbindung geschlossen wird, auch wenn mittendrin eine Ausnahme ausgelöst wird:
# Connecting to Snowflake con = snowflake.connector.connect(...) try: # Running queries con.cursor().execute(...) ... finally: # Closing the connection con.close()
Verwenden eines Kontextmanagers zum Verbinden und Steuern von Transaktionen¶
Der Snowflake-Konnektor für Python unterstützt einen Kontextmanager, der bei Bedarf Ressourcen zuweist und freigibt. Der Kontextmanager ist nützlich, um für Transaktionen auf Basis des Anweisungsstatus ein Commit oder ein Rollback auszuführen, wenn autocommit
deaktiviert ist.
# Connecting to Snowflake using the context manager with snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False, ) as con: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
Im obigen Beispiel führt der Kontextmanager nach Fehlschlagen der dritten Anweisung ein Rollback der Änderungen in der Transaktion aus und schließt die Verbindung. Wenn alle Anweisungen erfolgreich waren, würde der Kontextmanager die Änderungen committen und die Verbindung schließen.
Der äquivalente Code mit try
- und except
-Blöcken ist wie folgt:
# Connecting to Snowflake using try and except blocks con = snowflake.connector.connect( user=USER, password=PASSWORD, account=ACCOUNT, autocommit=False) try: con.cursor().execute("INSERT INTO a VALUES(1, 'test1')") con.cursor().execute("INSERT INTO a VALUES(2, 'test2')") con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail con.commit() except Exception as e: con.rollback() raise e finally: con.close()
Protokollieren¶
Der Snowflake-Konnektor für Python nutzt das logging
-Standardmodul von Python, um den Status in regelmäßigen Abständen zu protokollieren, sodass die Anwendung ihre Aktivitäten im Hintergrund verfolgen kann. Der einfachste Weg, die Protokollierung zu aktivieren, ist der Aufruf von logging.basicConfig()
am Anfang der Anwendung.
Im folgenden Beispiel wird der Protokolliergrad auf INFO
eingestellt, und die Protokolle werden in einer Datei namens /tmp/snowflake_python_connector.log
gespeichert.
logging.basicConfig( filename=file_name, level=logging.INFO)
Eine umfassendere Protokollierung kann durch Einstellen des Protokolliergrads auf DEBUG
wie folgt aktiviert werden:
# Logging including the timestamp, thread and the source code location import logging for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)Die optionale, aber empfohlene Formatierungsklasse SecretDetector stellt sicher, dass ein bestimmter Satz bekannter vertraulicher Informationen maskiert wird, bevor diese in die Protokolldateien des Python-Konnektors von Snowflake geschrieben werden. Verwenden Sie für SecretDetector folgenden Code:
# Logging including the timestamp, thread and the source code location import logging from snowflake.connector.secret_detector import SecretDetector for logger_name in ['snowflake.connector', 'botocore', 'boto3']: logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) ch = logging.FileHandler('/tmp/python_connector.log') ch.setLevel(logging.DEBUG) ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s')) logger.addHandler(ch)Bemerkung
botocore
undboto3
sind über das AWS (Amazon Web Services) SDK für Python verfügbar.
Beispielprogramm¶
Der folgende Beispielcode kombiniert die meisten der in den vorangegangenen Abschnitten beschriebenen Beispiele zu einem funktionierenden Python-Programm: Dieses Beispiel enthält zwei Teile:
Eine übergeordnete Klasse („python_veritas_base“) enthält den Code für viele gängige Operationen, z. B. das Herstellen einer Verbindung zum Server.
Eine untergeordnete Klasse („python_connector_example“) repräsentiert die kundenspezifischen Teile eines bestimmten Clients, z. B. zum Abfragen einer Tabelle.
Dieser Beispielcode wird direkt aus einem unserer Tests importiert, um sicherzustellen, dass er in einem neueren Build des Produkts ausgeführt wurde.
Da dies aus einem Test stammt, enthält es eine kleine Menge Code, um einen alternativen Port und ein alternatives Protokoll festzulegen, die in einigen Tests verwendet werden. Benutzer sollten Protokoll oder Portnummer nicht festlegen. Lassen Sie diese Werte stattdessen aus, und verwenden Sie die Standardeinstellungen.
Dies enthält auch einige Abschnittsmarkierungen (manchmal als „Snippet-Tags“ bezeichnet), um Code zu identifizieren, der unabhängig in die Dokumentation importiert werden kann. Abschnittsmarkierungen sehen normalerweise ähnlich aus wie:
# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
Diese Abschnittsmarkierungen sind im Benutzercode nicht erforderlich.
Der erste Teil des Codebeispiels enthält die allgemeinen Unterroutinen für:
Lesen Sie Befehlszeilenargumente (z. B. „–warehouse MyWarehouse“), die Verbindungsinformationen enthalten.
Stellen Sie eine Verbindung zum Server her.
Erstellen und verwenden Sie ein Warehouse, eine Datenbank und ein Schema.
Löschen Sie das Schema, die Datenbank und das Warehouse, wenn Sie damit fertig sind.
import logging
import os
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
class python_veritas_base:
"""
PURPOSE:
This is the Base/Parent class for programs that use the Snowflake
Connector for Python.
This class is intended primarily for:
* Sample programs, e.g. in the documentation.
* Tests.
"""
def __init__(self, p_log_file_name = None):
"""
PURPOSE:
This does any required initialization steps, which in this class is
basically just turning on logging.
"""
file_name = p_log_file_name
if file_name is None:
file_name = '/tmp/snowflake_python_connector.log'
# -- (> ---------- SECTION=begin_logging -----------------------------
logging.basicConfig(
filename=file_name,
level=logging.INFO)
# -- <) ---------- END_SECTION ---------------------------------------
# -- (> ---------------------------- SECTION=main ------------------------
def main(self, argv):
"""
PURPOSE:
Most tests follow the same basic pattern in this main() method:
* Create a connection.
* Set up, e.g. use (or create and use) the warehouse, database,
and schema.
* Run the queries (or do the other tasks, e.g. load data).
* Clean up. In this test/demo, we drop the warehouse, database,
and schema. In a customer scenario, you'd typically clean up
temporary tables, etc., but wouldn't drop your database.
* Close the connection.
"""
# Read the connection parameters (e.g. user ID) from the command line
# and environment variables, then connect to Snowflake.
connection = self.create_connection(argv)
# Set up anything we need (e.g. a separate schema for the test/demo).
self.set_up(connection)
# Do the "real work", for example, create a table, insert rows, SELECT
# from the table, etc.
self.do_the_real_work(connection)
# Clean up. In this case, we drop the temporary warehouse, database, and
# schema.
self.clean_up(connection)
print("\nClosing connection...")
# -- (> ------------------- SECTION=close_connection -----------------
connection.close()
# -- <) ---------------------------- END_SECTION ---------------------
# -- <) ---------------------------- END_SECTION=main --------------------
def args_to_properties(self, args):
"""
PURPOSE:
Read the command-line arguments and store them in a dictionary.
Command-line arguments should come in pairs, e.g.:
"--user MyUser"
INPUTS:
The command line arguments (sys.argv).
RETURNS:
Returns the dictionary.
DESIRABLE ENHANCEMENTS:
Improve error detection and handling.
"""
connection_parameters = {}
i = 1
while i < len(args) - 1:
property_name = args[i]
# Strip off the leading "--" from the tag, e.g. from "--user".
property_name = property_name[2:]
property_value = args[i + 1]
connection_parameters[property_name] = property_value
i += 2
return connection_parameters
def create_connection(self, argv):
"""
PURPOSE:
This gets account identifier and login information from the
environment variables and command-line parameters, connects to the
server, and returns the connection object.
INPUTS:
argv: This is usually sys.argv, which contains the command-line
parameters. It could be an equivalent substitute if you get
the parameter information from another source.
RETURNS:
A connection.
"""
# Get account identifier and login information from environment variables and command-line parameters.
# For information about account identifiers, see
# https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
# -- (> ----------------------- SECTION=set_login_info ---------------
# Get the password from an appropriate environment variable, if
# available.
PASSWORD = os.getenv('SNOWSQL_PWD')
# Get the other login info etc. from the command line.
if len(argv) < 11:
msg = "ERROR: Please pass the following command-line parameters:\n"
msg += "--warehouse <warehouse> --database <db> --schema <schema> "
msg += "--user <user> --account <account_identifier> "
print(msg)
sys.exit(-1)
else:
connection_parameters = self.args_to_properties(argv)
USER = connection_parameters["user"]
ACCOUNT = connection_parameters["account"]
WAREHOUSE = connection_parameters["warehouse"]
DATABASE = connection_parameters["database"]
SCHEMA = connection_parameters["schema"]
# Optional: for internal testing only.
try:
PORT = connection_parameters["port"]
except:
PORT = ""
try:
PROTOCOL = connection_parameters["protocol"]
except:
PROTOCOL = ""
# If the password is set by both command line and env var, the
# command-line value takes precedence over (is written over) the
# env var value.
# If the password wasn't set either in the environment var or on
# the command line...
if PASSWORD is None or PASSWORD == '':
print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
sys.exit(-2)
# -- <) ---------------------------- END_SECTION ---------------------
# Optional diagnostic:
#print("USER:", USER)
#print("ACCOUNT:", ACCOUNT)
#print("WAREHOUSE:", WAREHOUSE)
#print("DATABASE:", DATABASE)
#print("SCHEMA:", SCHEMA)
#print("PASSWORD:", PASSWORD)
#print("PROTOCOL:" "'" + PROTOCOL + "'")
#print("PORT:" + "'" + PORT + "'")
print("Connecting...")
# If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
# -- (> ------------------- SECTION=connect_to_snowflake ---------
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)
# -- <) ---------------------------- END_SECTION -----------------
else:
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA,
# Optional: for internal testing only.
protocol=PROTOCOL,
port=PORT
)
return conn
def set_up(self, connection):
"""
PURPOSE:
Set up to run a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.create_warehouse_database_and_schema(connection)
def do_the_real_work(self, conn):
"""
PURPOSE:
Your sub-class should override this to include the code required for
your documentation sample or your test case.
This default method does a very simple self-test that shows that the
connection was successful.
"""
# Create a cursor for this connection.
cursor1 = conn.cursor()
# This is an example of an SQL statement we might want to run.
command = "SELECT PI()"
# Run the statement.
cursor1.execute(command)
# Get the results (should be only one):
for row in cursor1:
print(row[0])
# Close this cursor.
cursor1.close()
def clean_up(self, connection):
"""
PURPOSE:
Clean up after a test. You can override this method with one
appropriate to your test/demo.
"""
# Create a temporary warehouse, database, and schema.
self.drop_warehouse_database_and_schema(connection)
def create_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Create the temporary schema, database, and warehouse that we use
for most tests/demos.
"""
# Create a database, schema, and warehouse if they don't already exist.
print("\nCreating warehouse, database, schema...")
# -- (> ------------- SECTION=create_warehouse_database_schema -------
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# -- (> --------------- SECTION=use_warehouse_database_schema --------
conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
# -- <) ---------------------------- END_SECTION ---------------------
def drop_warehouse_database_and_schema(self, conn):
"""
PURPOSE:
Drop the temporary schema, database, and warehouse that we create
for most tests/demos.
"""
# -- (> ------------- SECTION=drop_warehouse_database_schema ---------
conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
# -- <) ---------------------------- END_SECTION ---------------------
# ----------------------------------------------------------------------------
if __name__ == '__main__':
pvb = python_veritas_base()
pvb.main(sys.argv)
Der zweite Teil des Codebeispiels erstellt eine Tabelle, fügt Zeilen ein usw.:
import sys
# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------
# Import the base class that contains methods used in many tests and code
# examples.
from python_veritas_base import python_veritas_base
class python_connector_example (python_veritas_base):
"""
PURPOSE:
This is a simple example program that shows how to use the Snowflake
Python Connector to create and query a table.
"""
def __init__(self):
pass
def do_the_real_work(self, conn):
"""
INPUTS:
conn is a Connection object returned from snowflake.connector.connect().
"""
print("\nCreating table test_table...")
# -- (> ----------------------- SECTION=create_table ---------------------
conn.cursor().execute(
"CREATE OR REPLACE TABLE "
"test_table(col1 integer, col2 string)")
conn.cursor().execute(
"INSERT INTO test_table(col1, col2) VALUES " +
" (123, 'test string1'), " +
" (456, 'test string2')")
# -- <) ---------------------------- END_SECTION -------------------------
print("\nSelecting from test_table...")
# -- (> ----------------------- SECTION=querying_data --------------------
cur = conn.cursor()
try:
cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# -- <) ---------------------------- END_SECTION -------------------------
# ============================================================================
if __name__ == '__main__':
test_case = python_connector_example()
test_case.main(sys.argv)
Gehen Sie folgendermaßen vor, um dieses Beispiel auszuführen:
Kopieren Sie den ersten Teil des Codes in eine Datei mit dem Namen „python_veritas_base.py“.
Kopieren Sie den zweiten Teil des Codes in eine Datei mit dem Namen „python_connector_example.py“.
Setzen Sie die Umgebungsvariable SNOWSQL_PWD auf Ihr Kennwort, zum Beispiel:
export SNOWSQL_PWD='MyPassword'Führen Sie das Programm über eine Befehlszeile ähnlich der folgenden aus (ersetzen Sie die Benutzer- und Kontoinformationen natürlich durch Ihre eigenen Benutzer- und Kontoinformationen).
Warnung
Dies löscht das Warehouse, die Datenbank und das Schema am Ende des Programms! Verwenden Sie nicht den Namen einer vorhandenen Datenbank, da Sie diese verlieren werden!
python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
Und hier ist die Ausgabe:
Connecting...
Creating warehouse, database, schema...
Creating table test_table...
Selecting from test_table...
123, test string1
456, test string2
Closing connection...
Hier ist ein längeres Beispiel:
Bemerkung
Achten Sie darauf, dass Sie in dem Abschnitt, in dem Sie Ihre Konto- und Anmeldeinformationen festlegen, die Variablen bei Bedarf so ersetzen, dass sie Ihren Snowflake-Anmeldeinformationen (Name, Kennwort usw.) entsprechen.
In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.
#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#
# Logging
import logging
logging.basicConfig(
filename='/tmp/snowflake_python_connector.log',
level=logging.INFO)
import snowflake.connector
# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'
import os
# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# Connecting to Snowflake
con = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
)
# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")
# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")
# Creating a table and inserting data
con.cursor().execute(
"CREATE OR REPLACE TABLE "
"testtable(col1 integer, col2 string)")
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(123, 'test string1'),(456, 'test string2')")
# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
STORAGE_INTEGRATION = myint
FILE_FORMAT=(field_delimiter=',')
""".format(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
# Querying data
cur = con.cursor()
try:
cur.execute("SELECT col1, col2 FROM testtable")
for (col1, col2) in cur:
print('{0}, {1}'.format(col1, col2))
finally:
cur.close()
# Binding data
con.cursor().execute(
"INSERT INTO testtable(col1, col2) "
"VALUES(%(col1)s, %(col2)s)", {
'col1': 789,
'col2': 'test string3',
})
# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))
# Catching syntax errors
cur = con.cursor()
try:
cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
# default error message
print(e)
# user error message
print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
cur.close()
# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
# Closing the connection
con.close()