Utilisation du kit de ressources SQLAlchemy Snowflake avec le connecteur Python

Snowflake SQLAlchemy s’exécute sur le connecteur Snowflake pour Python en tant que dialecte pour relier une base de données Snowflake et des applications SQLAlchemy.

Dans ce chapitre :

Conditions préalables

Connecteur Snowflake pour Python

La seule exigence pour Snowflake SQLAlchemy est le connecteur Snowflake pour Python. Cependant, le connecteur n’a pas besoin d’être installé, car installer Snowflake SQLAlchemy installe automatiquement le connecteur.

Frameworks d’analyse de données et d’application Web (facultatif)

Snowflake SQLAlchemy peut être utilisé avec pandas, Jupyter et Pyramid qui fournissent des niveaux plus élevés de frameworks d’application pour l’analyse de données et les applications Web. Cependant, la création d’un environnement de travail à partir de zéro n’est pas une tâche simple, en particulier pour les utilisateurs débutants. L’installation des frameworks nécessite des compilateurs et des outils C, et choisir les versions et les outils adéquats peut s’avérer périlleux, ce qui peut décourager les utilisateurs d’utiliser des applications Python.

Un moyen plus simple de construire un environnement est Anaconda qui fournit une pile technologique complète et précompilée pour tous les utilisateurs, y compris pour les experts non-Python, comme les analystes de données et les étudiants. Pour obtenir les instructions d’installation d’Anaconda, voir la documentation d’installation d’Anaconda. Le pack Snowflake SQLAlchemy peut alors être installé sur Anaconda en utilisant pip.

Installation de Snowflake SQLAlchemy

Le pack Snowflake SQLAlchemy peut être installé à partir du dépôt public PyPI en utilisant la commande pip :

pip install --upgrade snowflake-sqlalchemy
Copy

pip installe automatiquement tous les modules requis, dont le connecteur Snowflake pour Python.

Les notes du développeur sont hébergées avec le code source sur GitHub.

Vérification de votre installation

  1. Créez un fichier (par exemple validate.py) qui contient le code exemple Python suivant qui se connecte à Snowflake et affiche la version Snowflake :

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. Remplacez <nom_connexion_utilisateur>, <mot_de_passe> et <identificateur_de_compte> par les valeurs correspondantes pour votre compte et utilisateur Snowflake. Pour plus de détails, voir Paramètres de connexion (dans ce chapitre).

  3. Exécutez l’exemple de code. Par exemple, si vous avez créé un fichier nommé validate.py :

    python validate.py
    
    Copy

La version Snowflake (par ex. 1.48.0) doit être affichée.

Paramètres et comportements spécifiques à Snowflake

Dans la mesure du possible, Snowflake SQLAlchemy fournit des fonctionnalités compatibles pour les applications SQLAlchemy. Pour plus d’informations sur l’utilisation de SQLAlchemy, voir la documentation SQLAlchemy.

Cependant, Snowflake SQLAlchemy fournit également des paramètres et un comportement spécifiques à Snowflake qui sont décrits dans les sections suivantes.

Paramètres de connexion

Paramètres requis

Snowflake SQLAlchemy utilise la syntaxe de chaîne de connexion suivante pour se connecter à Snowflake et lancer une session :

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

Où :

  • <nom_connexion_utilisateur> est le nom d’utilisateur de votre utilisateur Snowflake.

  • <mot_de_passe> est le mot de passe de votre utilisateur Snowflake.

  • <identificateur_de_compte> est votre identificateur de compte . Voir Identificateurs de compte.

    Note

    N’incluez pas le nom de domaine Snowflake snowflakecomputing.com dans l’identificateur de votre compte. Snowflake ajoute automatiquement le nom de domaine à votre identificateur de compte pour créer la connexion requise.

Paramètres de connexion supplémentaires

Vous pouvez éventuellement inclure les informations suivantes à la fin de la chaîne de connexion (après <nom_compte>) :

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

Où :

  • <nom_base_de_données> et <nom_schéma> sont la base de données initiale et le schéma de la session Snowflake, séparés par des barres obliques (/).

  • warehouse=<nom_entrepôt> et role=<nom_rôle>' sont l’entrepôt et le rôle initiaux de la session, spécifiés sous forme de chaînes de paramètres, séparés par des points d’interrogation (?).

Note

Après la connexion, la base de données initiale, le schéma, l’entrepôt et le rôle spécifiés dans la chaîne de connexion peuvent toujours être modifiés pour la session.

Configuration du serveur proxy

Les paramètres du serveur proxy ne sont pas pris en charge. Utilisez plutôt des variables d’environnement prises en charge pour configurer un serveur proxy. Pour plus d’informations, voir Utilisation d’un serveur proxy.

Exemples de chaîne de connexion

L’exemple suivant appelle la méthode create_engine avec le nom d’utilisateur testuser1, le mot de passe 0123456, l’identificateur de compte myorganization-myaccount, la base de données testdb, le schéma public, l’entrepôt testwh et le rôle myrole :

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

Par souci de facilité, vous pouvez utiliser la méthode snowflake.sqlalchemy.URL pour construire la chaîne de connexion et vous connecter à la base de données. L’exemple suivant construit la même chaîne de connexion que l’exemple précédent :

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

Ouverture et fermeture de la connexion

Ouvrez une connexion en exécutant engine.connect() ; évitez d’utiliser engine.execute().

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

Note

Assurez-vous de fermer la connexion en exécutant connection.close() avant engine.dispose() ; sinon, le collecteur Python Garbage supprime les ressources nécessaires pour communiquer avec Snowflake, empêchant le connecteur Python de fermer correctement la session.

Si vous prévoyez d’utiliser des transactions explicites, vous devez désactiver l’option d’exécution AUTOCOMMIT dans SQLAlchemy.

Par défaut, SQLAlchemy active cette option. Lorsque cette option est activée, les instructions INSERT, UPDATE et DELETE sont automatiquement validées lors de leur exécution, même si ces instructions sont exécutées dans une transaction explicite.

Pour désactiver AUTOCOMMIT, transmettez autocommit=False à la méthode Connection.execution_options() . Par exemple :

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

Comportement d’auto-incrémentation

L’auto-incrémentation d’une valeur nécessite l’objet Sequence . Inclure l’objet Sequence dans la colonne de clé primaire pour incrémenter automatiquement la valeur à chaque nouvel enregistrement inséré. Par exemple :

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

Traitement de la casse de nom d’objet

Snowflake stocke tous les noms d’objets insensibles à la casse en majuscules. En revanche, SQLAlchemy considère que tous les noms d’objet en minuscules sont insensibles à la casse. Snowflake SQLAlchemy convertit la casse du nom de l’objet pendant la communication au niveau du schéma, (c’est-à-dire pendant la réflexion de la table et de l’index). Si vous utilisez des noms d’objet en majuscules, SQLAlchemy suppose qu’ils sont sensibles à la casse et qu’ils contiennent des guillemets. Ce comportement causera des discordances avec les données du dictionnaire de données reçu de Snowflake. À moins que les noms des identificateurs n’aient vraiment été créés comme étant sensibles à la casse en utilisant des guillemets, (par exemple "TestDb"), tous les noms en minuscules doivent être utilisés du côté SQLAlchemy.

Prise en charge de l’index

Snowflake n’utilise pas d’index. Par conséquent, Snowflake SQLAlchemy n’en utilise pas non plus.

Prise en charge du type de données NumPy

Snowflake SQLAlchemy prend en change la liaison et la collecte de types de données NumPy. La liaison est toujours prise en charge. Pour activer la récupération des types de données NumPy , ajoutez numpy=True aux paramètres de connexion.

Les types de données NumPy suivants sont pris en charge :

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

L’exemple suivant montre l’aller-retour de données numpy.datetime64 :

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date
Copy

Métadonnées de colonne de cache

SQLAlchemy fournit l’API d’inspection d’exécution pour obtenir les informations d’exécution sur les différents objets. L’un des cas d’utilisation classiques est la récupération de toutes les tables et de leurs métadonnées de colonne dans un schéma afin de construire un catalogue de schémas. Par exemple, alembic, en plus de SQLAlchemy, gère les migrations de schéma de base de données. Un flux de pseudo-code se présente comme suit :

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

Dans ce flux, cela peut prendre un certain temps car les requêtes s’exécutent sur chaque table. Les résultats sont mis en cache, mais l’obtention des métadonnées de colonne est onéreuse.

Pour atténuer le problème, Snowflake SQLAlchemy prend un indicateur cache_column_metadata=True tel que toutes les métadonnées de colonne pour toutes les tables sont mises en cache lorsque get_table_names est appelé. Le reste de get_columns, get_primary_keys et get_foreign_keys peut exploiter le cache.

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

Note

L’utilisation de la mémoire augmentera à mesure que toutes les métadonnées des colonnes associées à l’objet Inspector seront mises en cache. N’utilisez l’indicateur que si vous avez besoin d’obtenir toutes les métadonnées de colonne.

Prise en charge de VARIANT, ARRAY et OBJECT

Snowflake SQLAlchemy prend en charge la récupération des types de données VARIANT, ARRAY et OBJECT. Tous les types sont convertis en str dans Python afin que vous puissiez les convertir en types de données natifs en utilisant json.loads.

Cet exemple montre comment créer une table incluant les colonnes de type de données VARIANT, ARRAY et OBJECT.

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

Pour récupérer les colonnes de type de données VARIANT, ARRAY et OBJECT et les convertir en types de données Python natifs, récupérez les données et appelez la méthode json.loads comme suit :

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

Prise en charge de CLUSTER BY

Snowflake SQLAlchemy prend en charge le paramètre CLUSTER BY pour les tables. Pour plus d’informations sur ce paramètre, voir CREATE TABLE.

Cet exemple montre comment créer une table avec deux colonnes, id et name, comme clé de clustering :

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Prise en charge d’Alembic

Alembic est un outil de migration de base de données en plus de SQLAlchemy. Snowflake SQLAlchemy fonctionne en ajoutant le code suivant à alembic/env.py pour qu’Alembic puisse reconnaître Snowflake SQLAlchemy.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

Voir la documentation d’Alembic pour des informations sur l’utilisation générale.

Prise en charge de l’authentification par paire de clés

Snowflake SQLAlchemy prend en charge l’authentification par paire de clés en tirant parti de la fonctionnalité du connecteur Snowflake pour Python. Voir Utilisation de l’authentification par paire de clés et rotation de paires de clés pour connaître les étapes de création de clés privées et publiques.

Le paramètre de clé privée est transmis dans connect_args comme suit :

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

PRIVATE_KEY_PASSPHRASE est une phrase secrète pour décrypter le fichier de clé privée, rsa_key.p8.

La méthode snowflake.sqlalchemy.URL ne prend pas en charge les paramètres de clé privée.

Prise en charge de la commande de fusion

Snowflake SQLAlchemy prend en charge l’exécution d’un upsert avec son expression personnalisée MergeInto. Voir MERGE pour une documentation complète.

Utilisez cela comme suit :

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

Prise en charge de CopyIntoStorage

Snowflake SQLAlchemy prend en charge l’enregistrement des tables et des résultats de requêtes dans différentes zones de préparation Snowflake, conteneurs Azure et compartiments AWS avec son expression CopyIntoStorage personnalisée. Voir COPY INTO <emplacement> pour une documentation complète.

Utilisez cela comme suit :

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy