Verwenden des Python-Konnektors

Unter diesem Thema werden zahlreiche Beispiele bereitgestellt, die veranschaulichen, wie Sie den Snowflake-Konnektor verwenden können, um Snowflake-Standardoperationen wie Benutzeranmeldung, Datenbank- und Tabellenerstellung, Warehouse-Erstellung, Einfügen/Laden von Daten sowie Abfragen durchzuführen.

Im Beispielcode am Ende dieses Themas werden einzelne Beispiele zu einem einzigen, funktionierenden Python-Programm zusammengeführt.

Bemerkung

Snowflake bietet jetzt erstklassige Python-APIs für die Verwaltung von Snowflake-Kernressourcen wie Datenbanken, Schemas, Tabellen, Aufgaben und Warehouses, ohne SQL zu verwenden. Weitere Informationen dazu finden Sie unter Snowflake-Python-API: Verwalten von Snowflake-Objekten mit Python.

Unter diesem Thema:

Erstellen einer Datenbank, eines Schemas und eines Warehouse

Erstellen Sie nach der Anmeldung mit den Befehlen CREATE DATABASE, CREATE SCHEMA und CREATE WAREHOUSE eine Datenbank, ein Schema und ein Warehouse, falls diese noch nicht vorhanden sind.

Das folgende Beispiel zeigt, wie sich ein Warehouse mit dem Namen tiny_warehouse, eine Datenbank mit dem Namen testdb und ein Schema mit dem Namen testschema erstellen lassen. Beachten Sie, dass Sie beim Erstellen des Schemas entweder den Namen der Datenbank angeben müssen, in der das Schema erstellt werden soll, oder bereits mit der Datenbank verbunden sein müssen, in der das Schema erstellt werden soll. Im folgenden Beispiel wird vor dem Befehl CREATE SCHEMA ein USE DATABASE-Befehl ausgeführt, um sicherzustellen, dass das Schema in der korrekten Datenbank erstellt wird.

conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Copy

Verwenden von Datenbank, Schema und Warehouse

Geben Sie Datenbank und Schema an, in denen Sie Tabellen erstellen möchten. Geben Sie auch das Warehouse an, das Ressourcen für die Ausführung von DML-Anweisungen und -Abfragen bereitstellen soll.

Folgendes Beispiel zeigt die Verwendung von Datenbank testdb, Schema testschema und Warehouse tiny_warehouse (zuvor erstellt):

conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Copy

Erstellen von Tabellen und Einfügen von Daten

Verwenden Sie den Befehl CREATE TABLE, um Tabellen zu erstellen, und den Befehl INSERT, um die Tabellen mit Daten zu füllen.

Erstellen Sie beispielsweise eine Tabelle mit dem Namen testtable, und fügen Sie zwei Zeilen in die Tabelle ein:

conn.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "test_table(col1 integer, col2 string)")

conn.cursor().execute(
    "INSERT INTO test_table(col1, col2) VALUES " + 
    "    (123, 'test string1'), " + 
    "    (456, 'test string2')")
Copy

Laden von Daten

Anstatt Daten mit einzelnen INSERT-Befehlen in Tabellen einzufügen, können Sie Daten aus Dateien, die entweder an einem internen oder externen Speicherort bereitgestellt werden, per Massenladen hinzufügen.

Kopieren von Daten von einem internen Speicherort

Um Daten aus Dateien auf Ihrem Hostcomputer in eine Tabelle zu laden, verwenden Sie zunächst den Befehl PUT für das Staging der Datei an einem internen Speicherort. Verwenden Sie dann den Befehl COPY INTO <Tabelle>, um die Daten aus den Dateien in die Tabelle zu kopieren.

Beispiel:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
Copy

Hier werden die CSV-Daten in einem lokalen Verzeichnis namens /tmp/data in einer Linux- oder macOS-Umgebung gespeichert. Das Verzeichnis enthält dann Dateien mit den Namen file0, file1, … file100.

Kopieren von Daten von einem externen Speicherort

Um Daten aus Dateien, die bereits in einem externen Stagingbereich (z. B. Ihr S3-Bucket) bereitgestellt wurden, in eine Tabelle zu laden, verwenden Sie den Befehl COPY INTO <Tabelle>.

Beispiel:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
Copy

Wobei:

  • s3://<S3-Bucket>/data/ gibt den Namen Ihres S3-Buckets an.

  • Die Dateien im Bucket erhalten das Präfix data.

  • Der Zugriff auf den Bucket erfolgt über eine Speicherintegration, die mit CREATE STORAGE INTEGRATION von einem Kontoadministrator (d. h. einem Benutzer mit der Rolle ACCOUNTADMIN) oder einer Rolle mit der globalen Berechtigung CREATE INTEGRATION erstellt wurde. Bei Verwendung einer Speicherintegration benötigen Benutzer für den Zugriff auf einen privaten Speicherort keine Anmeldeinformationen mehr.

Bemerkung

In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.

Abfragen von Daten

Mit dem Snowflake-Konnektor für Python können Sie Folgendes übermitteln:

  • Eine synchrone Abfrage, die die Kontrolle an Ihre Anwendung zurückgibt, nachdem die Abfrage abgeschlossen ist.

  • Eine asynchrone Abfrage, die die Kontrolle an Ihre Anwendung zurückgibt, bevor die Abfrage abgeschlossen ist.

Nachdem die Abfrage abgeschlossen wurde, verwenden Sie das Cursor-Objekt, um die Werte in den Ergebnissen abzurufen. Standardmäßig konvertiert der Snowflake-Konnektor für Python die Werte von Snowflake-Datentypen in native Python-Datentypen. (Beachten Sie, dass Sie die Werte auch als Zeichenfolgen zurückgeben und die Typkonvertierungen in Ihrer Anwendung durchführen können. Siehe Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung.)

Bemerkung

Standardmäßig werden die Werte aus NUMBER-Spalten als Gleitkommawerten mit doppelter Genauigkeit (float64) zurückgegeben. Um diese als Dezimalwerte (decimal.Decimal) in den Methoden fetch_pandas_all() und fetch_pandas_batches() zurückzugeben, setzen Sie den Parameter arrow_number_to_decimal in der Methode connect() auf True.

Ausführen einer synchronen Abfrage

Um eine synchrone Abfrage auszuführen, rufen Sie die Methode execute() im Cursor-Objekt auf. Beispiel:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Copy

Verwenden Sie das Cursor-Objekt, um die Werte in den Ergebnissen abzurufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.

Ausführen einer asynchronen Abfrage

Der Snowflake-Konnektor für Python unterstützt asynchrone Abfragen (d. h. Abfragen, die dem Benutzer die Kontrolle zurückgeben, bevor die Abfrage abgeschlossen ist). Sie können eine asynchrone Abfrage übermitteln und mithilfe von Abrufen (Polling) feststellen, wann die Abfrage abgeschlossen ist. Nachdem die Abfrage abgeschlossen ist, können Sie die Ergebnisse abrufen.

Bemerkung

Um asynchrone Abfragen auszuführen, müssen Sie sicherstellen, dass der Konfigurationsparameter ABORT_DETACHED_QUERY den Wert FALSE hat (Standardwert).

Snowflake schließt Verbindungen automatisch nach einer bestimmten Zeit (Standard: 5 Minuten), wodurch alle aktiven Abfragen verwaist sind. Wenn der Wert TRUE ist, bricht Snowflake diese verwaisten Abfragen ab, was sich auf asynchrone Abfragen auswirken kann.

Mit dieser Funktion können Sie mehrere Abfragen parallel übermitteln, ohne auf den Abschluss jeder Abfrage warten zu müssen. Sie können innerhalb derselben Sitzung auch Kombinationen von synchronen und asynchronen Abfragen ausführen.

Schließlich können Sie eine asynchrone Abfrage über eine Verbindung übermitteln und die Ergebnisse über eine andere Verbindung abrufen. Beispielsweise kann ein Benutzer eine Abfrage mit langer Laufzeit über Ihre Anwendung initiieren, die Anwendung beenden und die Anwendung zu einem späteren Zeitpunkt erneut starten, um die Ergebnisse zu überprüfen.

Übermitteln einer asynchronen Abfrage

Um eine asynchrone Abfrage zu übermitteln, rufen Sie die Methode execute_async() im Cursor-Objekt auf. Beispiel:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Copy

Nach dem Übermitteln der Abfrage:

Beispiele für das Ausführen asynchroner Abfragen finden Sie unter Beispiele für asynchrone Abfragen.

Best Practices für asynchrone Abfragen

Beachten Sie beim Übermitteln von asynchronen Abfrage die folgenden bewährten Verfahren:

  • Stellen Sie vor der parallelen Ausführung von Abfragen sicher, dass Sie die Abhängigkeiten der Abfragen von anderen Abfragen genau kennen. Einige Abfragen sind voneinander und von der Ausführungsreihenfolge abhängig und sind daher nicht für die Parallelisierung geeignet. Beispielsweise kann eine INSERT-Anweisung offensichtlich erst beginnen, nachdem die entsprechende CREATE TABLE-Anweisung abgeschlossen wurde.

  • Stellen Sie sicher, dass die Anzahl der gestarteten Abfragen auf den verfügbaren Arbeitsspeicher abgestimmt ist. Das parallele Ausführen mehrerer Abfragen beansprucht normalerweise mehr Speicher, insbesondere wenn mehrere Ergebnis-Datasets gleichzeitig im Arbeitsspeicher gespeichert sind.

  • Behandeln Sie beim Abrufen (Polling) die seltenen Fälle, in denen eine Abfrage nicht erfolgreich ist.

  • Stellen Sie sicher, dass Anweisungen zur Transaktionssteuerung (BEGIN, COMMIT und ROLLBACK) nicht parallel zu anderen Anweisungen ausgeführt werden.

Abrufen der Snowflake-Abfrage-ID

Eine Abfrage-ID kennzeichnet jede von Snowflake ausgeführte Abfrage. Wenn Sie den Snowflake-Konnektor für Python zum Ausführen einer Abfrage verwenden, können Sie auf die Abfrage-ID über das Attribut sfqid im Cursor-Objekt zugreifen:

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
Copy

Sie können die Abfrage-ID für Folgendes verwenden:

Überprüfen des Status einer Abfrage

So überprüfen Sie den Status einer Abfrage:

  1. Rufen Sie die Abfrage-ID aus dem Feld sfqid im Objekt Cursor ab.

  2. Übergeben Sie die Abfrage-ID an die Methode get_query_status() des Connection-Objekts, um die Enumerationskonstante QueryStatus zurückzugeben, die den Status der Abfrage darstellt.

    Standardmäßig löst get_query_status() keinen Fehler aus, wenn die Abfrage zu einem Fehler geführt hat. Wenn Sie möchten, dass ein Fehler ausgelöst wird, rufen Sie stattdessen get_query_status_throw_if_error() auf.

  3. Verwenden Sie die Enumerationskonstante QueryStatus, um den Status der Abfrage zu überprüfen.

    • Um festzustellen, ob die Abfrage noch ausgeführt wird (z. B. wenn es sich um eine asynchrone Abfrage handelt), übergeben Sie die Konstante an die Methode is_still_running() des Connection-Objekts.

    • Um festzustellen, ob ein Fehler aufgetreten ist, übergeben Sie die Konstante an die Methode is_an_error().

    Eine vollständige Liste der Enumerationskonstanten erhalten Sie mit QueryStatus.

Im folgenden Beispiel wird eine asynchrone Abfrage ausgeführt und der Status der Abfrage überprüft:

import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
  time.sleep(1)
Copy

Im folgenden Beispiel wird ein Fehler ausgelöst, wenn die Abfrage zu einem Fehler geführt hat:

from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
  query_id = cur.sfqid
  while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
    time.sleep(1)
except ProgrammingError as err:
  print('Programming Error: {0}'.format(err))
Copy

Verwenden der Abfrage-ID zum Abrufen von Abfrageergebnissen

Bemerkung

Wenn Sie eine synchrone Abfrage ausgeführt haben, indem Sie die Methode execute() für ein Cursor-Objekt aufgerufen haben, ist die Abfrage-ID zum Abrufen der Ergebnisse nicht erforderlich. Sie können einfach die Werte aus den Ergebnissen abrufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.

Wenn Sie die Ergebnisse einer asynchronen Abfrage oder einer zuvor übermittelten synchronen Abfrage abrufen möchten, führen Sie die folgenden Schritte aus:

  1. Rufen Sie die Abfrage-ID der Abfrage ab. Siehe Abrufen der Snowflake-Abfrage-ID.

  2. Rufen Sie die Methode get_results_from_sfqid() im Cursor-Objekt auf, um die Ergebnisse abzurufen.

  3. Verwenden Sie das Cursor-Objekt, um die Werte in den Ergebnissen abzurufen, wie unter Verwenden von cursor zum Abrufen von Werten erläutert.

Falls die Abfrage noch ausgeführt wird, müssen Sie beachten, dass die Fetch-Methoden (fetchone(), fetchmany(), fetchall() usw.) auf den Abschluss der Abfrage warten.

Beispiel:

# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

Verwenden von cursor zum Abrufen von Werten

Rufen Sie Werte aus einer Tabelle mit der Cursorobjektiterator-Methode ab.

Um beispielsweise Spalten mit dem Namen „col1“ und „col2“ aus der zuvor mit testtable erstellten Tabelle (siehe Erstellen von Tabellen und Einfügen von Daten) abzurufen, verwenden Sie Code, der dem folgenden ähnelt:

cur = conn.cursor()
try:
    cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()
Copy

Alternativ bietet der Snowflake-Konnektor für Python ein abgekürztes Verfahren:

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))
Copy

Wenn Sie ein einzelnes Ergebnis (d. h. eine einzelnen Zeile) erhalten möchten, verwenden Sie die Methode fetchone:

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))
Copy

Wenn Sie die angegebene Anzahl von Zeilen auf einmal erhalten möchten, verwenden Sie die Methode fetchmany mit der Anzahl der Zeilen:

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)
Copy

Bemerkung

Verwenden Sie fetchone oder fetchmany, wenn das Resultset zu groß für den verfügbaren Speicher ist.

Wenn Sie alle Ergebnisse auf einmal erhalten müssen:

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))
Copy

Um ein Zeitlimit (Timeout) für eine Abfrage festzulegen, führen Sie einen „begin“-Befehl aus, und fügen Sie einen Timeout-Parameter in die Abfrage ein. Wenn die Abfrage die Zeitdauer des Parameterwertes überschreitet, wird ein Fehler generiert und ein Rollback durchgeführt.

Im folgenden Code bedeutet der Fehler 604, dass die Abfrage abgebrochen wurde. Der Timeout-Parameter startet Timer() und bricht ab, wenn die Abfrage nicht innerhalb der angegebenen Zeit beendet wird.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")
Copy

Verwenden von DictCursor zum Abrufen von Werten nach Spaltenname

Wenn Sie einen Wert anhand des Spaltennamens abrufen möchten, erstellen Sie ein cursor-Objekt vom Typ DictCursor.

Beispiel:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()
Copy

Beispiele für asynchrone Abfragen

Im Folgenden finden Sie ein einfaches Beispiel für eine asynchrone Abfrage:

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

Im nächsten Beispiel wird eine asynchrone Abfrage von einer Verbindung aus übermittelt und das Ergebnis von einer anderen Verbindung aus abgerufen:

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Get the query ID for the asynchronous query.
query_id = cur.sfqid

# Close the cursor and the connection.
cur.close()
conn.close()

# Open a new connection.
new_conn = snowflake.connector.connect( ... )

# Create a new cursor.
new_cur = new_conn.cursor()

# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Copy

Abbrechen einer Abfrage über die Abfrage-ID

So brechen Sie eine Abfrage über die Abfrage-ID ab:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()
Copy

Ersetzen Sie die Zeichenfolge „queryID“ durch die tatsächliche Abfrage-ID. Um die ID für eine Abfrage zu erhalten, siehe Abrufen der Snowflake-Abfrage-ID.

Verbessern der Abfrageleistung durch Umgehen der Datenkonvertierung

Verwenden Sie zur Verbesserung der Abfrageleistung die Klasse SnowflakeNoConverterToPython im Modul snowflake.connector.converter_null, um die Datenkonvertierung vom internen Snowflake-Datentyp in den systemeigenen Python-Datentyp zu umgehen, z. B.:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data
Copy

Im Ergebnis werden alle Daten als Zeichenfolgen dargestellt, sodass die Anwendung für die Konvertierung in die systemeigenen Python-Datentypen verantwortlich ist. Beispielsweise sind TIMESTAMP_NTZ- und TIMESTAMP_LTZ-Daten die als Zeichenfolgen dargestellte Epochenzeit, und TIMESTAMP_TZ-Daten sind die Epochenzeit, gefolgt von einem Leerzeichen, gefolgt vom Offset zu UTC in Minuten, dargestellt als Zeichenfolge.

Das Binden der Daten wird nicht beeinflusst. Python-native Daten können weiterhin für Updates gebunden werden.

Binden von Daten

Um Werte anzugeben, die in einer SQL-Anweisung verwendet werden sollen, können Sie Literale in die Anweisung einschließen oder Variablen binden. Wenn Sie Variablen binden, fügen Sie einen oder mehrere Platzhalter in den Text der SQL-Anweisung ein, und geben Sie dann für jeden Platzhalter die Variable (den zu verwendenden Wert) an.

Im folgenden Beispiel wird die Verwendung von Literalen und die Bindung gegenübergestellt:

Literale:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")
Copy

Bindung:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))
Copy

Bemerkung

Es gibt eine Obergrenze für die Datengröße, die Sie binden oder in einem Batch kombinieren können. Weitere Details dazu finden Sie unter Begrenzung der Abfragetextgröße.

Snowflake unterstützt die folgenden Bindungstypen:

Jeder Typ wird unten erklärt.

pyformat- oder format-Bindung

Sowohl die pyformat-Bindung als auch die format-Bindung binden Daten auf der Clientseite und nicht auf der Serverseite.

Standardmäßig unterstützt der Snowflake-Konnektor für Python sowohl pyformat als auch format, sodass Sie %(name)s oder %s als Platzhalter verwenden können. Beispiel:

  • Verwenden von %(name)s als Platzhalter:

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) "
        "VALUES(%(col1)s, %(col2)s)", {
            'col1': 789,
            'col2': 'test string3',
            })
    
    Copy
  • Verwenden von %s als Platzhalter:

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    
    Copy

Mit pyformat und format können Sie auch ein Listenobjekt verwenden, um Daten für den IN-Operator zu binden:

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))
Copy

Das Prozentzeichen („%“) ist sowohl ein Platzhalterzeichen für SQL LIKE als auch ein Formatbindungszeichen für Python. Wenn Sie die Formatbindung verwenden und Ihr SQL-Befehl das Prozentzeichen enthält, müssen Sie möglicherweise das Prozentzeichen maskieren. Wenn Ihre SQL-Anweisung beispielsweise wie folgt lautet:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.
Copy

Dann sollte Ihr Python-Code wie folgt aussehen (beachten Sie das zusätzliche Prozentzeichen, um das ursprüngliche Prozentzeichen zu umgehen):

sql_command = "select col1, col2 from test_table "
sql_command += " where col2 like '%%York' limit %(lim)s"
parameter_dictionary = {'lim': 1 }
cur.execute(sql_command, parameter_dictionary)
Copy

qmark- oder numeric-Bindung

Sowohl die qmark-Bindung als auch die numeric-Bindung binden Daten eher auf der Serverseite als auf der Clientseite:

  • Verwenden Sie für die qmark-Bindung ein Fragezeichenzeichen (?), um anzugeben, an welcher Stelle in der Zeichenfolge der Wert einer Variablen eingefügt werden soll.

  • Verwenden Sie für die numeric-Bindung einen Doppelpunkt (:), gefolgt von einer Zahl, mit der die Position der Variablen angegeben wird, die an dieser Position ersetzt werden soll. So gibt beispielsweise :2 die zweite Variable an.

    Verwenden Sie die numerische Bindung, um denselben Wert mehr als einmal in derselben Abfrage zu binden. Wenn Sie z. B. einen langen VARCHAR- oder BINARY- oder semistrukturierten Wert haben, den Sie mehr als einmal verwenden möchten, dann können Sie mit der numeric-Bindung den Wert einmal an den Server senden und ihn mehrfach verwenden.

In den nächsten Abschnitten wird erklärt, wie Sie qmark- und numeric-Bindung verwenden:

Verwenden von qmark- oder numeric-Bindung

Um die Stilbindung qmark oder numeric zu verwenden, führen Sie einen der folgenden Schritte aus:

  • snowflake.connector.paramstyle='qmark'

  • snowflake.connector.paramstyle='numeric'

Wichtig

Sie müssen das paramstyle-Attribut festlegen, bevor Sie die connect()-Methode aufrufen.

Wenn Sie paramstyle auf qmark oder numeric setzen, müssen Sie ? oder :N (wobei N durch eine Zahl ersetzt wird) als Platzhalter verwenden.

Beispiel:

  • Verwenden von ? als Platzhalter:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
    Copy
  • Verwenden von :N als Platzhalter:

    import snowflake.connector
    
    snowflake.connector.paramstyle='numeric'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
    Copy

    Die folgende Abfrage zeigt die Verwendung der numeric-Bindung zur Wiederverwendung einer Variablen:

    con.cursor().execute(
        "INSERT INTO testtable(complete_video, short_sample_of_video) "
        "VALUES(:1, SUBSTRING(:1, :2, :3))", (
            binary_value_that_stores_video,          # variable :1
            starting_offset_in_bytes_of_video_clip,  # variable :2
            length_in_bytes_of_video_clip            # variable :3
        ))
    
    Copy

Verwenden von qmark- oder numeric-Bindung mit datetime-Objekten

Wenn Sie die Bindung qmark oder numeric verwenden, um Daten an einen Snowflake-Datentyp TIMESTAMP zu binden, setzen Sie die Bindungsvariable auf ein Tupel, das den Datentyp des Snowflake-Zeitstempels (TIMESTAMP_LTZ oder TIMESTAMP_TZ) und den Wert angibt. Beispiel:

import snowflake.connector

snowflake.connector.paramstyle='qmark'

con = snowflake.connector.connect(...)

con.cursor().execute(
    "CREATE OR REPLACE TABLE testtable2 ("
    "   col1 int, "
    "   col2 string, "
    "   col3 timestamp_ltz"
    ")"
)

con.cursor().execute(
    "INSERT INTO testtable2(col1,col2,col3) "
    "VALUES(?,?,?)", (
        987,
        'test string4',
        ("TIMESTAMP_LTZ", datetime.now())
    )
 )
Copy

Im Gegensatz zur clientseitigen Bindung benötigt die serverseitige Bindung den Snowflake-Datentyp für die Spalte. Die meisten gängigen Python-Datentypen weisen bereits implizite Zuordnungen zu Snowflake-Datentypen auf (z. B. int ist FIXED zugeordnet). Da die Python-datetime-Daten jedoch an einen von mehreren Snowflake-Datentypen (TIMESTAMP_NTZ, TIMESTAMP_LTZ oder TIMESTAMP_TZ) gebunden sein können und die Standardzuordnung TIMESTAMP_NTZ ist, müssen Sie den zu verwendenden Snowflake-Datentyp angeben.

Verwenden von Bindungsvariablen mit dem IN-Operator

qmark und numeric (serverseitige Bindung) unterstützen nicht die Verwendung von Bindungsvariablen mit dem IN-Operator.

Wenn Sie Bindungsvariablen mit dem IN-Operator verwenden müssen, verwenden Sie clientseitige Bindung (pyformat oder format).

Binden von Parametern an Variablen für Batcheinfügungen

In Ihrem Anwendungscode können Sie mehrere Zeilen in einen einzelnen Batch einfügen. Verwenden Sie dazu Parameter für Werte in einer INSERT-Anweisung. In der folgenden Anweisung werden z. B. Platzhalter für qmark-Bindungen in einer INSERT-Anweisung verwendet:

insert into grocery (item, quantity) values (?, ?)
Copy

Zum Spezifizieren der einzufügenden Daten definieren Sie dann eine Variable, die eine Sequenz von Sequenzen ist (z. B. eine Liste von Tupeln):

rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
Copy

Wie im obigen Beispiel gezeigt, ist jedes Element in der Liste ein Tupel, das die Spaltenwerte für eine einzufügende Zeile enthält.

Um die Bindung durchzuführen, rufen Sie die Methode executemany() auf und übergeben die Variable als zweites Argument. Beispiel:

conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
    "insert into grocery (item, quantity) values (?, ?)",
    rows_to_insert)
Copy

Durch Binden von Daten auf dem Server (d. h. mittels qmark- oder numeric-Bindung) kann der Konnektor die Leistung von Batcheinfügungen optimieren.

Wenn Sie dieses Verfahren verwenden, um eine große Anzahl von Werten einzufügen, kann die Treiberleistung verbessert werden, indem die Daten (ohne Erstellen von Dateien auf dem lokalen Computer) an einen temporären Stagingbereich gestreamt werden. Der Treiber führt dies automatisch durch, wenn die Anzahl der Werte einen Schwellenwert überschreitet.

Außerdem müssen die aktuelle Datenbank und das aktuelle Schema für die Sitzung festgelegt sein. Wenn diese nicht festgelegt sind, kann der vom Treiber ausgeführte CREATE TEMPORARY STAGE-Befehl folgenden Fehler generieren:

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Copy

Bemerkung

Alternative Möglichkeiten zum Laden von Daten in die Snowflake-Datenbank (einschließlich Massenladen mit dem COPY-Befehl) finden Sie unter Daten in Snowflake laden.

Angriffe durch Einschleusung von SQL-Befehlen verhindern

Binden Sie Daten nicht mit der Formatierungsfunktion von Python, da Sie eine Einschleusung von SQL-Befehlen riskieren. Beispiel:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )
Copy

Speichern Sie stattdessen die Werte in Variablen, und binden Sie diese Variablen dann mit qmark oder einem numerischen Bindungsformat ein.

Abrufen von Spaltenmetadaten

Für das Abrufen der Metadaten jeder Spalte im Resultset abzurufen (z. B. Name, Typ, Genauigkeit, Dezimalstellenzahl usw. jeder Spalte) gibt es folgende Möglichkeiten:

  • Nach dem Aufruf der Methode execute() zur Ausführung der Abfrage können Sie mit dem Attribut description des Cursor-Objekts auf die Metadaten zuzugreifen.

  • Wenn Sie auf die Metadaten zugreifen möchten, ohne eine Abfrage ausführen zu müssen, rufen Sie die Methode describe() auf.

    Die Methode describe ist im Snowflake-Konnektor für Python ab Version 2.4.6 verfügbar.

Das Attribut description wird auf einen der folgenden Werte gesetzt:

  • Version 2.4.5 und früher: Eine Liste von Tupeln.

  • Versionen 2.4.6 und später: Eine Liste von ResultMetadata-Objekten. (Die Methode describe gibt ebenfalls diese Liste zurück.)

Jedes Tupel- und ResultMetadata-Objekt enthält die Metadaten zu einer Spalte (Spaltenname, Datentyp usw.). Sie können auf die Metadaten über den Index zugreifen oder ab Version 2.4.6 auch über das Attribut ResultMetadata.

Die folgenden Beispiele demonstrieren, wie Sie auf die Metadaten der zurückgegebenen Tupel und ResultMetadata-Objekte zugreifen können.

Beispiel: Abrufen der Spaltennamen-Metadaten nach Index (ab Version 2.4.5):

Im folgenden Beispiel wird das Attribut description verwendet, um die Liste der Spaltennamen nach der Ausführung einer Abfrage abzurufen. Das Attribut ist eine Liste von Tupeln. Das Beispiel greift auf den Spaltennamen des ersten Wertes in jedem Tupel zu.

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col[0] for col in cur.description]))
Copy

Beispiel: Abrufen der Spaltennamen-Metadaten nach Attribut (ab Version 2.4.6):

Im folgenden Beispiel wird das Attribut description verwendet, um die Liste der Spaltennamen nach der Ausführung einer Abfrage abzurufen. Das Attribut ist eine Liste von ResultMetaData-Objekten. Das Beispiel greift auf den Spaltennamen aus dem Attribut name jedes ResultMetadata-Objekts zu.

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col.name for col in cur.description]))
Copy

Beispiel: Abrufen der Spaltennamen-Metadaten ohne Ausführen der Abfrage (ab Version 2.4.6):

Das folgende Beispiel verwendet die Methode describe, um die Liste der Spaltennamen abzurufen, ohne eine Abfrage ausführen zu müssen. Die Methode describe() gibt eine Liste von ResultMetaData-Objekten zurück. Das Beispiel greift auf den Spaltennamen aus dem Attribut name jedes ResultMetadata-Objekts zu.

cur = conn.cursor()
result_metadata_list = cur.describe("SELECT * FROM test_table")
print(','.join([col.name for col in result_metadata_list]))
Copy

Fehlerbehandlung

Die Anwendung muss auf Ausnahmen, die vom Snowflake-Konnektor ausgelöst werden, korrekt reagieren und entscheiden, ob die Codeausführung fortgesetzt oder abgebrochen wird.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()
Copy

Verwenden von execute_stream zum Ausführen von SQL-Skripten

Mit der Funktion execute_stream können Sie ein oder mehrere SQL-Skripte in einem Stream ausführen:

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)
Copy

Schließen der Verbindung

Als Best Practice schließen Sie die Verbindung, indem Sie die Methode close aufrufen:

connection.close()
Copy

Dadurch wird sichergestellt, dass die gesammelten Clientmetriken an den Server übermittelt werden und die Sitzung gelöscht wird. Außerdem helfen try-finally Blöcke sicherzustellen, dass die Verbindung geschlossen wird, auch wenn mittendrin eine Ausnahme ausgelöst wird:

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()
Copy

Verwenden eines Kontextmanagers zum Verbinden und Steuern von Transaktionen

Der Snowflake-Konnektor für Python unterstützt einen Kontextmanager, der bei Bedarf Ressourcen zuweist und freigibt. Der Kontextmanager ist nützlich, um für Transaktionen auf Basis des Anweisungsstatus ein Commit oder ein Rollback auszuführen, wenn autocommit deaktiviert ist.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
Copy

Im obigen Beispiel führt der Kontextmanager nach Fehlschlagen der dritten Anweisung ein Rollback der Änderungen in der Transaktion aus und schließt die Verbindung. Wenn alle Anweisungen erfolgreich waren, würde der Kontextmanager die Änderungen committen und die Verbindung schließen.

Der äquivalente Code mit try- und except-Blöcken ist wie folgt:

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()
Copy

Protokollieren

Der Snowflake-Konnektor für Python nutzt das logging-Standardmodul von Python, um den Status in regelmäßigen Abständen zu protokollieren, sodass die Anwendung ihre Aktivitäten im Hintergrund verfolgen kann. Der einfachste Weg, die Protokollierung zu aktivieren, ist der Aufruf von logging.basicConfig() am Anfang der Anwendung.

Im folgenden Beispiel wird der Protokolliergrad auf INFO eingestellt, und die Protokolle werden in einer Datei namens /tmp/snowflake_python_connector.log gespeichert.

logging.basicConfig(
    filename=file_name,
    level=logging.INFO)
Copy

Eine umfassendere Protokollierung kann durch Einstellen des Protokolliergrads auf DEBUG wie folgt aktiviert werden:

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

Die optionale, aber empfohlene Formatierungsklasse SecretDetector stellt sicher, dass ein bestimmter Satz bekannter vertraulicher Informationen maskiert wird, bevor diese in die Protokolldateien des Python-Konnektors von Snowflake geschrieben werden. Verwenden Sie für SecretDetector folgenden Code:

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

Bemerkung

botocore und boto3 sind über das AWS (Amazon Web Services) SDK für Python verfügbar.

Beispielprogramm

Der folgende Beispielcode kombiniert die meisten der in den vorangegangenen Abschnitten beschriebenen Beispiele zu einem funktionierenden Python-Programm: Dieses Beispiel enthält zwei Teile:

  • Eine übergeordnete Klasse („python_veritas_base“) enthält den Code für viele gängige Operationen, z. B. das Herstellen einer Verbindung zum Server.

  • Eine untergeordnete Klasse („python_connector_example“) repräsentiert die kundenspezifischen Teile eines bestimmten Clients, z. B. zum Abfragen einer Tabelle.

Dieser Beispielcode wird direkt aus einem unserer Tests importiert, um sicherzustellen, dass er in einem neueren Build des Produkts ausgeführt wurde.

Da dies aus einem Test stammt, enthält es eine kleine Menge Code, um einen alternativen Port und ein alternatives Protokoll festzulegen, die in einigen Tests verwendet werden. Benutzer sollten Protokoll oder Portnummer nicht festlegen. Lassen Sie diese Werte stattdessen aus, und verwenden Sie die Standardeinstellungen.

Dies enthält auch einige Abschnittsmarkierungen (manchmal als „Snippet-Tags“ bezeichnet), um Code zu identifizieren, der unabhängig in die Dokumentation importiert werden kann. Abschnittsmarkierungen sehen normalerweise ähnlich aus wie:

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
Copy

Diese Abschnittsmarkierungen sind im Benutzercode nicht erforderlich.

Der erste Teil des Codebeispiels enthält die allgemeinen Unterroutinen für:

  • Lesen Sie Befehlszeilenargumente (z. B. „–warehouse MyWarehouse“), die Verbindungsinformationen enthalten.

  • Stellen Sie eine Verbindung zum Server her.

  • Erstellen und verwenden Sie ein Warehouse, eine Datenbank und ein Schema.

  • Löschen Sie das Schema, die Datenbank und das Warehouse, wenn Sie damit fertig sind.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This gets account identifier and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account identifier and login information from environment variables and command-line parameters.
        # For information about account identifiers, see
        # https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account_identifier> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
            except:
                PORT = ""
            try:
                PROTOCOL = connection_parameters["protocol"]
            except:
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        # If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


Copy

Der zweite Teil des Codebeispiels erstellt eine Tabelle, fügt Zeilen ein usw.:


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

Copy

Gehen Sie folgendermaßen vor, um dieses Beispiel auszuführen:

  1. Kopieren Sie den ersten Teil des Codes in eine Datei mit dem Namen „python_veritas_base.py“.

  2. Kopieren Sie den zweiten Teil des Codes in eine Datei mit dem Namen „python_connector_example.py“.

  3. Setzen Sie die Umgebungsvariable SNOWSQL_PWD auf Ihr Kennwort, zum Beispiel:

    export SNOWSQL_PWD='MyPassword'
    
    Copy
  4. Führen Sie das Programm über eine Befehlszeile ähnlich der folgenden aus (ersetzen Sie die Benutzer- und Kontoinformationen natürlich durch Ihre eigenen Benutzer- und Kontoinformationen).

    Warnung

    Dies löscht das Warehouse, die Datenbank und das Schema am Ende des Programms! Verwenden Sie nicht den Namen einer vorhandenen Datenbank, da Sie diese verlieren werden!

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
    
    Copy

Und hier ist die Ausgabe:

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...
Copy

Hier ist ein längeres Beispiel:

Bemerkung

Achten Sie darauf, dass Sie in dem Abschnitt, in dem Sie Ihre Konto- und Anmeldeinformationen festlegen, die Variablen bei Bedarf so ersetzen, dass sie Ihren Snowflake-Anmeldeinformationen (Name, Kennwort usw.) entsprechen.

In diesem Beispiel wird die Anweisung mit der Funktion format() zusammengestellt. Wenn in Ihrer Umgebung das Risiko von Angriffen durch Einschleusung von SQL-Befehlen besteht, sollten Sie möglicherweise das Binden von Werten vorziehen, anstatt format() zu verwenden.

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'

import os

# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # user error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()
Copy