Verwenden des Snowflake SQLAlchemy-Toolkits mit dem Python-Konnektor

Snowflake SQLAlchemy wird auf dem Snowflake-Konnektor für Python als Dialekt ausgeführt, um eine Snowflake-Datenbank und SQLAlchemy-Anwendungen zu verbinden.

Unter diesem Thema:

Voraussetzungen

Snowflake-Konnektor für Python

Die einzige Voraussetzung für Snowflake SQLAlchemy ist der Snowflake-Konnektor für Python. Dieser Konnektor muss jedoch nicht installiert werden, da durch die Installation von Snowflake SQLAlchemy der Konnektor automatisch installiert wird.

Datenanalyse und Webanwendungs-Frameworks (optional)

Snowflake SQLAlchemy kann mit den Anwendungsframeworks pandas, Jupyter und Pyramid verwendet werden, die den höheren Ansprüchen von Datenanalysen und Webanwendungen gerecht werden. Der Aufbau einer Arbeitsumgebung von Grund auf ist jedoch keine einfache Aufgabe, insbesondere für unerfahrene Benutzer. Für die Installation der Frameworks sind C-Compiler und Tools erforderlich. Die Auswahl der passenden Tools und Versionen ist eine Hürde, die Benutzer möglicherweise von der Verwendung von Python-Anwendungen abhalten kann.

Eine einfachere Methode zum Aufbau einer Umgebung bietet Anaconda. Dieses Framework stellt einen vollständigen, vorkompilierten Technologiestack für alle Benutzer bereit, einschließlich Nicht-Python-Experten wie Datenanalysten und Studenten. Eine Installationsanleitung für Anaconda finden Sie in der Anaconda-Installationsdokumentation. Das Snowflake SQLAlchemy-Paket kann dann mit pip auf Anaconda installiert werden.

Installieren von Snowflake SQLAlchemy

Das Snowflake SQLAlchemy-Paket kann mit pip aus dem öffentlichen PyPI-Repository installiert werden:

pip install --upgrade snowflake-sqlalchemy
Copy

pip installiert automatisch alle erforderlichen Module, einschließlich des Snowflake-Konnektors für Python.

Beachten Sie, dass die Entwicklerhinweise mit dem Quellcode auf GitHub gehostet werden.

Überprüfen der Installation

  1. Erstellen Sie eine Datei (z. B. validate.py), die den folgenden Python-Beispielcode enthält, der eine Verbindung zu Snowflake herstellt und die Snowflake-Version anzeigt:

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. Stellen Sie sicher, dass Sie <Benutzeranmeldename>, <Kennwort> und <Kontobezeichner> durch die entsprechenden Werte für Ihr Snowflake-Konto und Ihren Snowflake-Benutzer ersetzen. Weitere Details dazu finden Sie unter Verbindungsparameter (unter diesem Thema).

  3. Führen Sie den Beispielcode aus. Wenn Sie beispielsweise eine Datei mit dem Namen validate.py erstellt haben:

    python validate.py
    
    Copy

Die Snowflake-Version (z. B. 1.48.0) sollte angezeigt werden.

Snowflake-spezifische Parameter und Verhalten

Snowflake SQLAlchemy bietet so viele kompatible Funktionen für SQLAlchemy-Anwendungen wie möglich. Weitere Informationen zur Verwendung von SQLAlchemy finden Sie in der SQLAlchemy-Dokumentation.

Snowflake SQLAlchemy bietet jedoch auch Snowflake-spezifische Parameter und Verhaltensweisen, die in den folgenden Abschnitten beschrieben werden.

Verbindungsparameter

Erforderliche Parameter

Snowflake SQLAlchemy verwendet die folgende Syntax für die Verbindungszeichenfolge, mit der eine Verbindung zu Snowflake hergestellt und eine Sitzung initiiert wird:

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

Wobei:

  • <Benutzeranmeldename> ist der Anmeldename für Ihren Snowflake-Benutzer.

  • <Kennwort> ist das Kennwort für Ihren Snowflake-Benutzer.

  • <Kontobezeichner> ist Ihr Kontobezeichner. Siehe Kontobezeichner.

    Bemerkung

    Fügen Sie nicht den Domänennamen snowflakecomputing.com Teil Ihres Kontobezeichners ein. Snowflake hängt den Domänennamen automatisch an Ihren Kontobezeichner an, um die angeforderte Verbindung herzustellen.

Zusätzliche Verbindungsparameter

Sie können optional die folgenden zusätzlichen Informationen am Ende der Verbindungszeichenfolge (nach <Kontoname>) angeben:

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

Wobei:

  • <Datenbankname> und <Schemaname> sind die anfängliche Datenbank und das erste Schema für die Snowflake-Sitzung, getrennt durch Schrägstriche (/).

  • warehouse=<Warehouse-Name> und role=<Rollenname>' sind das anfängliche Warehouse und die erste Rolle für die Sitzung. Sie werden als Parameterzeichenfolgen angegeben und durch Fragezeichen (?) getrennt.

Bemerkung

Nach der Anmeldung können die in der Verbindungszeichenfolge angegebenen anfänglichen Werte für Datenbank, Schema, Warehouse und Rolle jederzeit für die Sitzung geändert werden.

Konfiguration des Proxyservers

Proxyserver-Parameter werden nicht unterstützt. Verwenden Sie die unterstützten Umgebungsvariablen, um einen Proxyserver zu konfigurieren. Weitere Informationen dazu finden Sie unter Verwenden eines Proxyservers.

Beispiele für Verbindungszeichenfolgen

Im folgenden Beispiel wird die Methode create_engine mit dem Benutzernamen testuser1, dem Kennwort 0123456, dem Kontobezeichner myorganization-myaccount, der Datenbank testdb, dem Schema public, dem Warehouse testwh und der Rolle myrole aufgerufen.

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

Zur Vereinfachung können Sie die Methode snowflake.sqlalchemy.URL verwenden, um die Verbindungszeichenfolge zu erstellen und eine Verbindung zur Datenbank herzustellen. Im folgenden Beispiel wird dieselbe Verbindungszeichenfolge aus dem vorherigen Beispiel erstellt:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

Öffnen und Schließen einer Verbindung

Öffnen Sie eine Verbindung, indem Sie engine.connect() ausführen. Vermeiden Sie die Verwendung von engine.execute().

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

Bemerkung

Stellen Sie sicher, dass Sie die Verbindung schließen, indem Sie erst connection.close() und dann engine.dispose() ausführen. Andernfalls entfernt der Garbage Collector von Python die für die Kommunikation mit Snowflake erforderlichen Ressourcen, sodass der Python-Konnektor die Sitzung dann nicht mehr ordnungsgemäß schließen kann.

Wenn Sie explizite Transaktionen verwenden möchten, müssen Sie die AUTOCOMMIT-Ausführungsoption in SQLAlchemy deaktivieren.

In der Standardeinstellung aktiviert SQLAlchemy diese Option. Wenn diese Option aktiviert ist, werden INSERT-, UPDATE- und DELETE-Anweisungen bei der Ausführung automatisch committed, auch wenn diese Anweisungen innerhalb einer expliziten Transaktion ausgeführt werden.

Um AUTOCOMMIT zu deaktivieren, übergeben Sie autocommit=False an die Methode Connection.execution_options(). Beispiel:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

Automatische Inkrementierung

Für das automatische Inkrementieren eines Werts ist das Objekt Sequence erforderlich. Fügen Sie das Sequence-Objekt in die Primärschlüsselspalte ein, damit der Wert beim Einfügen eines neuen Datensatzes automatisch erhöht wird. Beispiel:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

Behandlung von Groß-/Kleinschreibung bei Objektnamen

Snowflake speichert Objektnamen, bei denen die Groß-/Kleinschreibung nicht relevant ist, in Großbuchstaben. Im Gegensatz dazu wird in SQLAlchemy davon ausgegangen, dass bei allen Objektnamen in Kleinbuchstaben die Groß-/Kleinschreibung nicht relevant ist. Snowflake SQLAlchemy konvertiert die Schreibung des Objektnamens während der Kommunikation auf Schemaebene (d. h. während der Tabellen- und Indexreflexion). Wenn Sie Objektnamen in Großbuchstaben verwenden, geht SQLAlchemy davon aus, dass die Groß-/Kleinschreibung relevant ist, und setzt die Namen in Anführungszeichen. Dieses Verhalten führt zu Konflikten mit Daten aus Datenwörterbüchern, die von Snowflake empfangen werden. Wenn also Bezeichnernamen nicht schon ursprünglich zur Berücksichtigung der Groß-/Kleinschreibung mit Anführungszeichen erstellt wurden (z. B. "TestDb"), sollten alle Namen in Kleinbuchstaben auf SQLAlchemy-Seite verwendet werden.

Index-Unterstützung

Snowflake verwendet keine Indizes, daher auch verwendet auch Snowflake SQLAlchemy keine.

Unterstützung von Datentyp NumPy

Snowflake SQLAlchemy unterstützt das Binden und Abrufen von NumPy-Datentypen. Das Binden wird immer unterstützt. Um das Abrufen von NumPy-Datentypen zu aktivieren, fügen Sie numpy=True zu den Verbindungsparametern hinzu.

Die folgenden NumPy-Datentypen werden unterstützt:

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

Das folgende Beispiel zeigt den Roundtrip von numpy.datetime64-Daten:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date
Copy

Zwischenspeichern von Spaltenmetadaten

SQLAlchemy stellt die API zur Laufzeitprüfung bereit, über die Sie Laufzeitinformationen zu den verschiedenen Objekten abrufen können. Ein häufiger Anwendungsfall ist das Abrufen aller Tabellen und ihrer Spaltenmetadaten in einem Schema, um einen Schemakatalog zu erstellen. Beispielsweise verwaltet Alembic über SQLAlchemy die Migration von Datenbankschemas. Ein Pseudocodefluss lautet wie folgt:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

Bei diesem Codefluss besteht ein mögliches Problem darin, dass die Ausführung einige Zeit dauern kann, denn die Abfragen werden erfolgen auf jeder einzelnen Tabelle. Die Ergebnisse werden zwar zwischengespeichert, aber das Abrufen von Spaltenmetadaten wird teuer.

Um das Problem abzumildern, verwendet Snowflake SQLAlchemy das Flag cache_column_metadata=True, sodass beim Aufruf von get_table_names alle Spaltenmetadaten für alle Tabellen zwischengespeichert werden und der Rest von get_columns, get_primary_keys und get_foreign_keys dann vom Cache profitiert.

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

Bemerkung

Beachten Sie, dass die Speichernutzung steigt, da alle Spaltenmetadaten zwischengespeichert und mit dem Inspector-Objekt verknüpft sind. Verwenden Sie das Flag nur, wenn Sie alle Spaltenmetadaten abrufen möchten.

Unterstützung von VARIANT, ARRAY und OBJECT

Snowflake SQLAlchemy unterstützt das Abrufen der Datentypen VARIANT, ARRAY und OBJECT. Alle Datentypen werden in Python in str konvertiert, sodass Sie sie mit json.loads in native Datentypen konvertieren können.

Im folgenden Beispiel wird gezeigt, wie eine Tabelle erstellt wird, die Spalten mit den Datentypen VARIANT, ARRAY und OBJECT enthält.

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

Um die Spalten mit den Datentypen VARIANT, ARRAY und OBJECT abzurufen und in die systemeigenen Python-Datentypen zu konvertieren, rufen Sie die Daten ab, und rufen Sie dann die Methode json.loads wie folgt auf:

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

Unterstützung von CLUSTER BY

Snowflake SQLAlchemy unterstützt den Parameter CLUSTER BY für Tabellen. Weitere Informationen zu diesem Parameter finden Sie unter CREATE TABLE.

Im folgenden Beispiel wird gezeigt, wie Sie eine Tabelle mit zwei Spalten, id und name, als Gruppierungsschlüssel erstellen:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Unterstützung von Alembic

Alembic ist ein Datenbankmigrationstool für SQLAlchemy. Snowflake SQLAlchemy fügt den folgenden Code zu alembic/env.py hinzu, damit Snowflake SQLAlchemy von Alembic erkannt werden kann.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

Allgemeine Informationen zur Verwendung finden Sie in der Alembic-Dokumentation.

Unterstützung der Schlüsselpaar-Authentifizierung

Snowflake SQLAlchemy unterstützt die Schlüsselpaar-Authentifizierung durch Verwendung der Funktionalität des Snowflake-Konnektors für Python. Eine Anleitung zum Erstellen von privaten und öffentlichen Schlüsseln finden Sie unter Verwenden von Schlüsselpaar-Authentifizierung und Schlüsselpaar-Rotation.

Der Parameter für den privaten Schlüssel wird durch connect_args wie folgt übergeben:

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

Dabei ist PRIVATE_KEY_PASSPHRASE eine Passphrase zum Entschlüsseln der Datei mit dem privaten Schlüssel rsa_key.p8.

Die Methode snowflake.sqlalchemy.URL unterstützt keine Parameter für private Schlüssel.

Unterstützung von Merge-Befehlen

Snowflake SQLAlchemy unterstützt das Ausführen eines Upsert mit seinem benutzerdefinierten MergeInto-Ausdruck. Eine vollständige Dokumentation dazu finden Sie unter MERGE.

Verwenden Sie den Ausdruck wie folgt:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

Unterstützung von CopyIntoStorage

Snowflake SQLAlchemy unterstützt das Speichern von Tabellen und Abfrageergebnissen in unterschiedlichen Snowflake-Stagingbereichen, Azure Containern und AWS-Buckets mit seinem benutzerdefinierten CopyIntoStorage-Ausdruck. Eine vollständige Dokumentation dazu finden Sie unter COPY INTO <Speicherort>.

Verwenden Sie den Ausdruck wie folgt:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy