Verwenden des Snowflake SQLAlchemy-Toolkits mit dem Python-Konnektor

Snowflake SQLAlchemy wird auf dem Snowflake-Konnektor für Python als Dialekt ausgeführt, um eine Snowflake-Datenbank und SQLAlchemy-Anwendungen zu verbinden.

Unter diesem Thema:

Voraussetzungen

Snowflake-Konnektor für Python

Die einzige Voraussetzung für Snowflake SQLAlchemy ist der Snowflake-Konnektor für Python. Dieser Konnektor muss jedoch nicht installiert werden, da durch die Installation von Snowflake SQLAlchemy der Konnektor automatisch installiert wird.

Datenanalyse und Webanwendungsframeworks (optional)

Snowflake SQLAlchemy kann mit den Anwendungsframeworks Pandas, Jupyter und Pyramid verwendet werden, die den höheren Ansprüchen von Datenanalysen und Webanwendungen gerecht werden. Der Aufbau einer Arbeitsumgebung von Grund auf ist jedoch keine einfache Aufgabe, insbesondere für unerfahrene Benutzer. Für die Installation der Frameworks sind C-Compiler und -Tools erforderlich. Die Auswahl der passenden Tools und Versionen ist eine Hürde, die Benutzer möglicherweise von der Verwendung von Python-Anwendungen abhalten kann.

Eine einfachere Methode zum Aufbau einer Umgebung bietet Anaconda. Dieses Framework stellt einen vollständigen, vorkompilierten Technologiestack für alle Benutzer bereit, einschließlich Nicht-Python-Experten wie Datenanalysten und Studenten. Eine Installationsanleitung für Anaconda finden Sie in der Anaconda-Installationsdokumentation. Das Snowflake SQLAlchemy-Paket kann dann mit pip auf Anaconda installiert werden.

Installieren von Snowflake SQLAlchemy

Das Snowflake SQLAlchemy-Paket kann mit pip aus dem öffentlichen PyPI-Repository installiert werden:

pip install --upgrade snowflake-sqlalchemy

pip installiert automatisch alle erforderlichen Module, einschließlich des Snowflake-Konnektors für Python.

Beachten Sie, dass die Entwicklerhinweise mit dem Quellcode auf GitHub gehostet werden.

Überprüfen der Installation

  1. Erstellen Sie eine Datei (z. B. validate.py), die den folgenden Python-Beispielcode enthält, der eine Verbindung zu Snowflake herstellt und die Snowflake-Version anzeigt:

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account}/'.format(
            user='<your_user_login_name>',
            password='<your_password>',
            account='<your_account_name>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
  2. Stellen Sie sicher, dass Sie <Ihren_Benutzeranmeldenamen>, <Ihr_Kennwort> und <Ihren_Kontonamen> durch die entsprechenden Werte für Ihr Snowflake-Konto und Ihren Snowflake-Benutzer ersetzen. Weitere Details dazu finden Sie unter Verbindungsparameter (unter diesem Thema).

  3. Führen Sie den Beispielcode aus. Wenn Sie beispielsweise eine Datei mit dem Namen validate.py erstellt haben:

    python validate.py
    

Die Snowflake-Version (z. B. 1.48.0) sollte angezeigt werden.

Snowflake-spezifische Parameter und Verhalten

Snowflake SQLAlchemy bietet so viele kompatible Funktionen für SQLAlchemy-Anwendungen wie möglich. Weitere Informationen zur Verwendung von SQLAlchemy finden Sie in der SQLAlchemy-Dokumentation.

Snowflake SQLAlchemy bietet jedoch auch Snowflake-spezifische Parameter und Verhaltensweisen, die in den folgenden Abschnitten beschrieben werden.

Verbindungsparameter

Erforderliche Parameter

Snowflake SQLAlchemy verwendet die folgende Syntax für die Verbindungszeichenfolge, mit der eine Verbindung zu Snowflake hergestellt und eine Sitzung initiiert wird:

'snowflake://<user_login_name>:<password>@<account_name>'

Wobei:

  • <Benutzeranmeldename> ist der Anmeldename für Ihren Snowflake-Benutzer.

  • <Kennwort> ist das Kennwort für Ihren Snowflake-Benutzer.

  • <Kontoname> ist der volle Name Ihres Kontos (bereitgestellt von Snowflake).

    Beachten Sie, dass Ihr vollständiger Kontoname möglicherweise zusätzliche Segmente enthält, die die Region und die Cloudplattform angeben, wo Ihr Konto gehostet wird.

    Beispiele für Kontonamen nach Region

    Wenn Ihr Kontoname beispielsweise xy12345 ist:

    Cloudplattform/Region

    Vollständiger Kontoname

    AWS

    US West (Oregon)

    xy12345

    US East (Ohio)

    xy12345.us-east-2.aws

    US East (N. Virginia)

    xy12345.us-east-1

    US East (Commercial Gov - N. Virginia)

    xy12345.us-east-1-gov.aws

    Canada (Central)

    xy12345.ca-central-1.aws

    EU (Irland)

    xy12345.eu-west-1

    EU (Frankfurt)

    xy12345.eu-central-1

    Asia Pacific (Tokio)

    xy12345.ap-northeast-1.aws

    Asia Pacific (Mumbai)

    xy12345.ap-south-1.aws

    Asia Pacific (Singapur)

    xy12345.ap-southeast-1

    Asia Pacific (Sydney)

    xy12345.ap-southeast-2

    GCP

    US Central1 (Iowa)

    xy12345.us-central1.gcp

    Europe West2 (London)

    xy12345.europe-west2.gcp

    Europe West4 (Niederlande)

    xy12345.europe-west4.gcp

    Azure

    West US 2 (Washington)

    xy12345.west-us-2.azure

    East US 2 (Virginia)

    xy12345.east-us-2.azure

    US Gov Virginia

    xy12345.us-gov-virginia.azure

    Canada Central (Toronto)

    xy12345.canada-central.azure

    West Europe (Niederlande)

    xy12345.west-europe.azure

    Switzerland North (Zürich)

    xy12345.switzerland-north.azure

    Southeast Asia (Singapur)

    xy12345.southeast-asia.azure

    Australia East (New South Wales)

    xy12345.australia-east.azure

    Wichtig

    Wenn eine der folgenden Bedingungen zutrifft, unterscheidet sich Ihr Kontoname von der oben beschriebenen Struktur:

    • Wenn Ihre Snowflake Edition VPS ist, wenden Sie sich an den Snowflake-Support, um Details zu Ihrem Kontonamen zu erhalten.

    • Wenn für Ihr Konto AWS PrivateLink aktiviert ist, muss der Kontoname ein zusätzliches privatelink-Segment enthalten. Weitere Details dazu finden Sie unter AWS PrivateLink & Snowflake.

    Bemerkung

    Fügen Sie nicht den Snowflake-Domänennamen (snowflakecomputing.com) als Teil Ihres Kontonamens ein. Snowflake hängt den Domänennamen automatisch an Ihren Kontonamen an, um die angeforderte Verbindung herzustellen.

Zusätzliche Verbindungsparameter

Sie können optional die folgenden zusätzlichen Informationen am Ende der Verbindungszeichenfolge (nach <Kontoname>) angeben:

'snowflake://<user_login_name>:<password>@<account_name>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'

Wobei:

  • <Datenbankname> und <Schemaname> sind die anfängliche Datenbank und das erste Schema für die Snowflake-Sitzung, getrennt durch Schrägstriche (/).

  • warehouse=<Warehouse-Name> und role=<Rollenname>' sind das anfängliche Warehouse und die erste Rolle für die Sitzung. Sie werden als Parameterzeichenfolgen angegeben und durch Fragezeichen (?) getrennt.

Bemerkung

Nach der Anmeldung können die in der Verbindungszeichenfolge angegebenen anfänglichen Werte für Datenbank, Schema, Warehouse und Rolle jederzeit für die Sitzung geändert werden.

Konfiguration des Proxyserver

Proxyserver-Parameter werden nicht unterstützt. Verwenden Sie die unterstützten Umgebungsvariablen, um einen Proxyserver zu konfigurieren. Weitere Informationen dazu finden Sie unter Verwenden eines Proxyservers.

Beispiele für Verbindungszeichenfolgen

Im folgenden Beispiel wird die Methode create_engine mit dem Benutzernamen testuser1, dem Kennwort 0123456, dem Kontonamen xy12345.us-east-1, der Datenbank testdb, dem Schema public, dem Warehouse testwh und der Rolle myrole aufgerufen.

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@xy12345.us-east-1/testdb/public?warehouse=testwh&role=myrole'
)

Zur Vereinfachung können Sie die Methode snowflake.sqlalchemy.URL verwenden, um die Verbindungszeichenfolge zu erstellen und eine Verbindung zur Datenbank herzustellen. Im folgenden Beispiel wird dieselbe Verbindungszeichenfolge aus dem vorherigen Beispiel erstellt:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'xy12345.us-east-1',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))

Öffnen und Schließen einer Verbindung

Öffnen Sie eine Verbindung, indem Sie engine.connect() ausführen. Vermeiden Sie die Verwendung von engine.execute().

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()

Bemerkung

Stellen Sie sicher, dass Sie die Verbindung schließen, indem Sie erst connection.close() und dann engine.dispose() ausführen. Andernfalls entfernt der Garbage Collector von Python die für die Kommunikation mit Snowflake erforderlichen Ressourcen, sodass der Python-Konnektor die Sitzung dann nicht mehr ordnungsgemäß schließen kann.

Automatische Inkrementierung

Für das automatische Inkrementieren eines Werts ist das Objekt Sequence erforderlich. Fügen Sie das Sequence-Objekt in die Primärschlüsselspalte ein, damit der Wert beim Einfügen eines neuen Datensatzes automatisch erhöht wird. Beispiel:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)

Behandlung von Groß-/Kleinschreibung bei Objektnamen

Snowflake speichert Objektnamen, bei denen die Groß-/Kleinschreibung nicht relevant ist, in Großbuchstaben. Im Gegensatz dazu wird in SQLAlchemy davon ausgegangen, dass die Groß-/Kleinschreibung bei allen Objektnamen in Kleinbuchstaben nicht relevant ist. Snowflake SQLAlchemy konvertiert den Objektnamen während der Kommunikation auf Schemaebene, d. h. während der Tabellen- und Indexreflexion. Wenn Sie Objektnamen in Großbuchstaben verwenden, geht SQLAlchemy davon aus, dass die Groß-/Kleinschreibung relevant ist, und setzt die Namen in Anführungszeichen. Dieses Verhalten führt zu Konflikten mit Daten aus Datenwörterbüchern, die von Snowflake empfangen werden. Wenn also Bezeichnernamen nicht schon ursprünglich zur Berücksichtigung der Groß-/Kleinschreibung mit Anführungszeichen erstellt wurden (z. B. "TestDb"), sollten alle Namen in Kleinbuchstaben auf SQLAlchemy-Seite verwendet werden.

Index-Unterstützung

Snowflake verwendet keine Indizes, daher auch verwendet auch Snowflake SQLAlchemy keine.

Unterstützung von Datentyp NumPy

Snowflake SQLAlchemy unterstützt das Binden und Abrufen von NumPy-Datentypen. Das Binden wird immer unterstützt. Um das Abrufen von NumPy-Datentypen zu aktivieren, fügen Sie numpy=True zu den Verbindungsparametern hinzu.

Die folgenden NumPy-Datentypen werden unterstützt:

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

Das folgende Beispiel zeigt den Roundtrip von numpy.datetime64-Daten:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'xy12345',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date

Zwischenspeichern von Spaltenmetadaten

SQLAlchemy stellt die API zur Laufzeitprüfung bereit, über die Sie Laufzeitinformationen zu den verschiedenen Objekten abrufen können. Ein häufiger Anwendungsfall ist das Abrufen aller Tabellen und ihrer Spaltenmetadaten in einem Schema, um einen Schemakatalog zu erstellen. Beispielsweise verwaltet Alembic über SQLAlchemy die Migration von Datenbankschemas. Ein Pseudocodefluss lautet wie folgt:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...

Bei diesem Codefluss besteht ein mögliches Problem darin, dass die Ausführung einige Zeit dauern kann, denn die Abfragen werden erfolgen auf jeder einzelnen Tabelle. Die Ergebnisse werden zwar zwischengespeichert, aber das Abrufen von Spaltenmetadaten wird teuer.

Um das Problem abzumildern, verwendet Snowflake SQLAlchemy das Flag cache_column_metadata=True, sodass beim Aufruf von get_table_names alle Spaltenmetadaten für alle Tabellen zwischengespeichert werden und der Rest von get_columns, get_primary_keys und get_foreign_keys dann vom Cache profitiert.

engine = create_engine(URL(
    account = 'xy12345',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))

Bemerkung

Beachten Sie, dass die Speichernutzung steigt, da alle Spaltenmetadaten zwischengespeichert und mit dem Inspector-Objekt verknüpft sind. Verwenden Sie das Flag nur, wenn Sie alle Spaltenmetadaten abrufen möchten.

Unterstützung von VARIANT, ARRAY und OBJECT

Snowflake SQLAlchemy unterstützt das Abrufen der Datentypen VARIANT, ARRAY und OBJECT. Alle Datentypen werden in Python in str konvertiert, sodass Sie sie mit json.loads in native Datentypen konvertieren können.

Im folgenden Beispiel wird gezeigt, wie eine Tabelle erstellt wird, die Spalten mit den Datentypen VARIANT, ARRAY und OBJECT enthält.

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)

Um die Spalten mit den Datentypen VARIANT, ARRAY und OBJECT abzurufen und in die systemeigenen Python-Datentypen zu konvertieren, rufen Sie die Daten ab, und rufen Sie dann die Methode json.loads wie folgt auf:

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])

Unterstützung von CLUSTER BY

Snowflake SQLAchemy unterstützt den Parameter CLUSTER BY für Tabellen. Weitere Informationen zu diesem Parameter finden Sie unter CREATE TABLE.

Im folgenden Beispiel wird gezeigt, wie Sie eine Tabelle mit zwei Spalten, id und name, als Gruppierungsschlüssel erstellen:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)

Unterstützung von Alembic

Alembic ist ein Datenbankmigrationstool für SQLAlchemy. Snowflake SQLAlchemy fügt den folgenden Code zu alembic/env.py hinzu, damit Snowflake SQLAlchemy von Alembic erkannt werden kann.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'

Allgemeine Informationen zur Verwendung finden Sie in der Alembic-Dokumentation.