Verwenden des Snowflake SQLAlchemy-Toolkits mit dem Python-Konnektor¶
Snowflake SQLAlchemy wird auf dem Snowflake-Konnektor für Python als Dialekt ausgeführt, um eine Snowflake-Datenbank und SQLAlchemy-Anwendungen zu verbinden.
Unter diesem Thema:
Voraussetzungen¶
Snowflake-Konnektor für Python¶
Die einzige Voraussetzung für Snowflake SQLAlchemy ist der Snowflake-Konnektor für Python. Dieser Konnektor muss jedoch nicht installiert werden, da durch die Installation von Snowflake SQLAlchemy der Konnektor automatisch installiert wird.
Datenanalyse und Webanwendungs-Frameworks (optional)¶
Snowflake SQLAlchemy kann mit den Anwendungsframeworks pandas, Jupyter und Pyramid verwendet werden, die den höheren Ansprüchen von Datenanalysen und Webanwendungen gerecht werden. Der Aufbau einer Arbeitsumgebung von Grund auf ist jedoch keine einfache Aufgabe, insbesondere für unerfahrene Benutzer. Für die Installation der Frameworks sind C-Compiler und Tools erforderlich. Die Auswahl der passenden Tools und Versionen ist eine Hürde, die Benutzer möglicherweise von der Verwendung von Python-Anwendungen abhalten kann.
Eine einfachere Methode zum Aufbau einer Umgebung bietet Anaconda. Dieses Framework stellt einen vollständigen, vorkompilierten Technologiestack für alle Benutzer bereit, einschließlich Nicht-Python-Experten wie Datenanalysten und Studenten. Eine Installationsanleitung für Anaconda finden Sie in der Anaconda-Installationsdokumentation. Das Snowflake SQLAlchemy-Paket kann dann mit pip auf Anaconda installiert werden.
Installieren von Snowflake SQLAlchemy¶
Das Snowflake SQLAlchemy-Paket kann mit pip
aus dem öffentlichen PyPI-Repository installiert werden:
pip install --upgrade snowflake-sqlalchemy
pip
installiert automatisch alle erforderlichen Module, einschließlich des Snowflake-Konnektors für Python.
Beachten Sie, dass die Entwicklerhinweise mit dem Quellcode auf GitHub gehostet werden.
Überprüfen der Installation¶
Erstellen Sie eine Datei (z. B.
validate.py
), die den folgenden Python-Beispielcode enthält, der eine Verbindung zu Snowflake herstellt und die Snowflake-Version anzeigt:#!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
Stellen Sie sicher, dass Sie
<Benutzeranmeldename>
,<Kennwort>
und<Kontobezeichner>
durch die entsprechenden Werte für Ihr Snowflake-Konto und Ihren Snowflake-Benutzer ersetzen. Weitere Details dazu finden Sie unter Verbindungsparameter (unter diesem Thema).Führen Sie den Beispielcode aus. Wenn Sie beispielsweise eine Datei mit dem Namen
validate.py
erstellt haben:python validate.py
Die Snowflake-Version (z. B. 1.48.0
) sollte angezeigt werden.
Snowflake-spezifische Parameter und Verhalten¶
Snowflake SQLAlchemy bietet so viele kompatible Funktionen für SQLAlchemy-Anwendungen wie möglich. Weitere Informationen zur Verwendung von SQLAlchemy finden Sie in der SQLAlchemy-Dokumentation.
Snowflake SQLAlchemy bietet jedoch auch Snowflake-spezifische Parameter und Verhaltensweisen, die in den folgenden Abschnitten beschrieben werden.
Verbindungsparameter¶
Erforderliche Parameter¶
Snowflake SQLAlchemy verwendet die folgende Syntax für die Verbindungszeichenfolge, mit der eine Verbindung zu Snowflake hergestellt und eine Sitzung initiiert wird:
'snowflake://<user_login_name>:<password>@<account_identifier>'
Wobei:
<Benutzeranmeldename>
ist der Anmeldename für Ihren Snowflake-Benutzer.<Kennwort>
ist das Kennwort für Ihren Snowflake-Benutzer.<Kontobezeichner>
ist Ihr Kontobezeichner. Siehe Kontobezeichner.Bemerkung
Fügen Sie nicht den Domänennamen
snowflakecomputing.com
Teil Ihres Kontobezeichners ein. Snowflake hängt den Domänennamen automatisch an Ihren Kontobezeichner an, um die angeforderte Verbindung herzustellen.
Zusätzliche Verbindungsparameter¶
Sie können optional die folgenden zusätzlichen Informationen am Ende der Verbindungszeichenfolge (nach <Kontoname>
) angeben:
'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Wobei:
<Datenbankname>
und<Schemaname>
sind die anfängliche Datenbank und das erste Schema für die Snowflake-Sitzung, getrennt durch Schrägstriche (/
).warehouse=<Warehouse-Name>
undrole=<Rollenname>'
sind das anfängliche Warehouse und die erste Rolle für die Sitzung. Sie werden als Parameterzeichenfolgen angegeben und durch Fragezeichen (?
) getrennt.
Bemerkung
Nach der Anmeldung können die in der Verbindungszeichenfolge angegebenen anfänglichen Werte für Datenbank, Schema, Warehouse und Rolle jederzeit für die Sitzung geändert werden.
Konfiguration des Proxyservers¶
Proxyserver-Parameter werden nicht unterstützt. Verwenden Sie die unterstützten Umgebungsvariablen, um einen Proxyserver zu konfigurieren. Weitere Informationen dazu finden Sie unter Verwenden eines Proxyservers.
Beispiele für Verbindungszeichenfolgen¶
Im folgenden Beispiel wird die Methode create_engine
mit dem Benutzernamen testuser1
, dem Kennwort 0123456
, dem Kontobezeichner myorganization-myaccount
, der Datenbank testdb
, dem Schema public
, dem Warehouse testwh
und der Rolle myrole
aufgerufen.
from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' )
Zur Vereinfachung können Sie die Methode snowflake.sqlalchemy.URL
verwenden, um die Verbindungszeichenfolge zu erstellen und eine Verbindung zur Datenbank herzustellen. Im folgenden Beispiel wird dieselbe Verbindungszeichenfolge aus dem vorherigen Beispiel erstellt:
from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', ))
Öffnen und Schließen einer Verbindung¶
Öffnen Sie eine Verbindung, indem Sie engine.connect()
ausführen. Vermeiden Sie die Verwendung von engine.execute()
.
# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose()
Bemerkung
Stellen Sie sicher, dass Sie die Verbindung schließen, indem Sie erst connection.close()
und dann engine.dispose()
ausführen. Andernfalls entfernt der Garbage Collector von Python die für die Kommunikation mit Snowflake erforderlichen Ressourcen, sodass der Python-Konnektor die Sitzung dann nicht mehr ordnungsgemäß schließen kann.
Wenn Sie explizite Transaktionen verwenden möchten, müssen Sie die AUTOCOMMIT-Ausführungsoption in SQLAlchemy deaktivieren.
In der Standardeinstellung aktiviert SQLAlchemy diese Option. Wenn diese Option aktiviert ist, werden INSERT-, UPDATE- und DELETE-Anweisungen bei der Ausführung automatisch committed, auch wenn diese Anweisungen innerhalb einer expliziten Transaktion ausgeführt werden.
Um AUTOCOMMIT zu deaktivieren, übergeben Sie autocommit=False
an die Methode Connection.execution_options()
. Beispiel:
# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:
try:
connection.execute("BEGIN")
connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
finally:
connection.close()
engine.dispose()
Automatische Inkrementierung¶
Für das automatische Inkrementieren eines Werts ist das Objekt Sequence
erforderlich. Fügen Sie das Sequence
-Objekt in die Primärschlüsselspalte ein, damit der Wert beim Einfügen eines neuen Datensatzes automatisch erhöht wird. Beispiel:
t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... )
Behandlung von Groß-/Kleinschreibung bei Objektnamen¶
Snowflake speichert Objektnamen, bei denen die Groß-/Kleinschreibung nicht relevant ist, in Großbuchstaben. Im Gegensatz dazu wird in SQLAlchemy davon ausgegangen, dass bei allen Objektnamen in Kleinbuchstaben die Groß-/Kleinschreibung nicht relevant ist. Snowflake SQLAlchemy konvertiert die Schreibung des Objektnamens während der Kommunikation auf Schemaebene (d. h. während der Tabellen- und Indexreflexion). Wenn Sie Objektnamen in Großbuchstaben verwenden, geht SQLAlchemy davon aus, dass die Groß-/Kleinschreibung relevant ist, und setzt die Namen in Anführungszeichen. Dieses Verhalten führt zu Konflikten mit Daten aus Datenwörterbüchern, die von Snowflake empfangen werden. Wenn also Bezeichnernamen nicht schon ursprünglich zur Berücksichtigung der Groß-/Kleinschreibung mit Anführungszeichen erstellt wurden (z. B. "TestDb"
), sollten alle Namen in Kleinbuchstaben auf SQLAlchemy-Seite verwendet werden.
Index-Unterstützung¶
Snowflake verwendet keine Indizes, daher auch verwendet auch Snowflake SQLAlchemy keine.
Unterstützung von Datentyp NumPy¶
Snowflake SQLAlchemy unterstützt das Binden und Abrufen von NumPy
-Datentypen. Das Binden wird immer unterstützt. Um das Abrufen von NumPy
-Datentypen zu aktivieren, fügen Sie numpy=True
zu den Verbindungsparametern hinzu.
Die folgenden NumPy
-Datentypen werden unterstützt:
numpy.int64
numpy.float64
numpy.datetime64
Das folgende Beispiel zeigt den Roundtrip von numpy.datetime64
-Daten:
import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') connection = engine.connect() connection.execute( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.execute( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", engine) assert df.c1.values[0] == specific_date
Zwischenspeichern von Spaltenmetadaten¶
SQLAlchemy stellt die API zur Laufzeitprüfung bereit, über die Sie Laufzeitinformationen zu den verschiedenen Objekten abrufen können. Ein häufiger Anwendungsfall ist das Abrufen aller Tabellen und ihrer Spaltenmetadaten in einem Schema, um einen Schemakatalog zu erstellen. Beispielsweise verwaltet Alembic über SQLAlchemy die Migration von Datenbankschemas. Ein Pseudocodefluss lautet wie folgt:
inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ...
Bei diesem Codefluss besteht ein mögliches Problem darin, dass die Ausführung einige Zeit dauern kann, denn die Abfragen werden erfolgen auf jeder einzelnen Tabelle. Die Ergebnisse werden zwar zwischengespeichert, aber das Abrufen von Spaltenmetadaten wird teuer.
Um das Problem abzumildern, verwendet Snowflake SQLAlchemy das Flag cache_column_metadata=True
, sodass beim Aufruf von get_table_names
alle Spaltenmetadaten für alle Tabellen zwischengespeichert werden und der Rest von get_columns
, get_primary_keys
und get_foreign_keys
dann vom Cache profitiert.
engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, ))
Bemerkung
Beachten Sie, dass die Speichernutzung steigt, da alle Spaltenmetadaten zwischengespeichert und mit dem Inspector
-Objekt verknüpft sind. Verwenden Sie das Flag nur, wenn Sie alle Spaltenmetadaten abrufen möchten.
Unterstützung von VARIANT, ARRAY und OBJECT¶
Snowflake SQLAlchemy unterstützt das Abrufen der Datentypen VARIANT
, ARRAY
und OBJECT
. Alle Datentypen werden in Python in str
konvertiert, sodass Sie sie mit json.loads
in native Datentypen konvertieren können.
Im folgenden Beispiel wird gezeigt, wie eine Tabelle erstellt wird, die Spalten mit den Datentypen VARIANT
, ARRAY
und OBJECT
enthält.
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metdata.create_all(engine)
Um die Spalten mit den Datentypen VARIANT
, ARRAY
und OBJECT
abzurufen und in die systemeigenen Python-Datentypen zu konvertieren, rufen Sie die Daten ab, und rufen Sie dann die Methode json.loads
wie folgt auf:
import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2])
Unterstützung von CLUSTER BY¶
Snowflake SQLAlchemy unterstützt den Parameter CLUSTER BY
für Tabellen. Weitere Informationen zu diesem Parameter finden Sie unter CREATE TABLE.
Im folgenden Beispiel wird gezeigt, wie Sie eine Tabelle mit zwei Spalten, id
und name
, als Gruppierungsschlüssel erstellen:
t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine)
Unterstützung von Alembic¶
Alembic ist ein Datenbankmigrationstool für SQLAlchemy
. Snowflake SQLAlchemy fügt den folgenden Code zu alembic/env.py
hinzu, damit Snowflake SQLAlchemy von Alembic erkannt werden kann.
from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake'
Allgemeine Informationen zur Verwendung finden Sie in der Alembic-Dokumentation.
Unterstützung der Schlüsselpaar-Authentifizierung¶
Snowflake SQLAlchemy unterstützt die Schlüsselpaar-Authentifizierung durch Verwendung der Funktionalität des Snowflake-Konnektors für Python. Eine Anleitung zum Erstellen von privaten und öffentlichen Schlüsseln finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.
Der Parameter für den privaten Schlüssel wird durch connect_args
wie folgt übergeben:
... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, )
Dabei ist PRIVATE_KEY_PASSPHRASE
eine Passphrase zum Entschlüsseln der Datei mit dem privaten Schlüssel rsa_key.p8
.
Die Methode snowflake.sqlalchemy.URL
unterstützt keine Parameter für private Schlüssel.
Unterstützung von Merge-Befehlen¶
Snowflake SQLAlchemy unterstützt das Ausführen eines Upsert mit seinem benutzerdefinierten MergeInto
-Ausdruck. Eine vollständige Dokumentation dazu finden Sie unter MERGE.
Verwenden Sie den Ausdruck wie folgt:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge)
Unterstützung von CopyIntoStorage¶
Snowflake SQLAlchemy unterstützt das Speichern von Tabellen und Abfrageergebnissen in unterschiedlichen Snowflake-Stagingbereichen, Azure Containern und AWS-Buckets mit seinem benutzerdefinierten CopyIntoStorage
-Ausdruck. Eine vollständige Dokumentation dazu finden Sie unter COPY INTO <Speicherort>.
Verwenden Sie den Ausdruck wie folgt:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into)