Utilisation du kit de ressources SQLAlchemy Snowflake avec le connecteur Python¶
Snowflake SQLAlchemy s’exécute sur le connecteur Snowflake pour Python en tant que dialecte pour relier une base de données Snowflake et des applications SQLAlchemy.
Dans ce chapitre :
Conditions préalables¶
Connecteur Snowflake pour Python¶
La seule exigence pour Snowflake SQLAlchemy est le connecteur Snowflake pour Python. Cependant, le connecteur n’a pas besoin d’être installé, car installer Snowflake SQLAlchemy installe automatiquement le connecteur.
Frameworks d’analyse de données et d’application Web (facultatif)¶
Snowflake SQLAlchemy peut être utilisé avec pandas, Jupyter et Pyramid qui fournissent des niveaux plus élevés de frameworks d’application pour l’analyse de données et les applications Web. Cependant, la création d’un environnement de travail à partir de zéro n’est pas une tâche simple, en particulier pour les utilisateurs débutants. L’installation des frameworks nécessite des compilateurs et des outils C, et choisir les versions et les outils adéquats peut s’avérer périlleux, ce qui peut décourager les utilisateurs d’utiliser des applications Python.
Un moyen plus simple de construire un environnement est Anaconda qui fournit une pile technologique complète et précompilée pour tous les utilisateurs, y compris pour les experts non-Python, comme les analystes de données et les étudiants. Pour obtenir les instructions d’installation d’Anaconda, voir la documentation d’installation d’Anaconda. Le pack Snowflake SQLAlchemy peut alors être installé sur Anaconda en utilisant pip.
Installation de Snowflake SQLAlchemy¶
Le pack Snowflake SQLAlchemy peut être installé à partir du dépôt public PyPI en utilisant la commande pip
:
pip install --upgrade snowflake-sqlalchemy
pip
installe automatiquement tous les modules requis, dont le connecteur Snowflake pour Python.
Les notes du développeur sont hébergées avec le code source sur GitHub.
Vérification de votre installation¶
Créez un fichier (par exemple
validate.py
) qui contient le code exemple Python suivant qui se connecte à Snowflake et affiche la version Snowflake :#!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
Remplacez
<nom_connexion_utilisateur>
,<mot_de_passe>
et<identificateur_de_compte>
par les valeurs correspondantes pour votre compte et utilisateur Snowflake. Pour plus de détails, voir Paramètres de connexion (dans ce chapitre).Exécutez l’exemple de code. Par exemple, si vous avez créé un fichier nommé
validate.py
:python validate.py
La version Snowflake (par ex. 1.48.0
) doit être affichée.
Paramètres et comportements spécifiques à Snowflake¶
Dans la mesure du possible, Snowflake SQLAlchemy fournit des fonctionnalités compatibles pour les applications SQLAlchemy. Pour plus d’informations sur l’utilisation de SQLAlchemy, voir la documentation SQLAlchemy.
Cependant, Snowflake SQLAlchemy fournit également des paramètres et un comportement spécifiques à Snowflake qui sont décrits dans les sections suivantes.
Paramètres de connexion¶
Paramètres requis¶
Snowflake SQLAlchemy utilise la syntaxe de chaîne de connexion suivante pour se connecter à Snowflake et lancer une session :
'snowflake://<user_login_name>:<password>@<account_identifier>'
Où :
<nom_connexion_utilisateur>
est le nom d’utilisateur de votre utilisateur Snowflake.<mot_de_passe>
est le mot de passe de votre utilisateur Snowflake.<identificateur_de_compte>
est votre identificateur de compte . Voir Identificateurs de compte.Note
N’incluez pas le nom de domaine Snowflake
snowflakecomputing.com
dans l’identificateur de votre compte. Snowflake ajoute automatiquement le nom de domaine à votre identificateur de compte pour créer la connexion requise.
Paramètres de connexion supplémentaires¶
Vous pouvez éventuellement inclure les informations suivantes à la fin de la chaîne de connexion (après <nom_compte>
) :
'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Où :
<nom_base_de_données>
et<nom_schéma>
sont la base de données initiale et le schéma de la session Snowflake, séparés par des barres obliques (/
).warehouse=<nom_entrepôt>
etrole=<nom_rôle>'
sont l’entrepôt et le rôle initiaux de la session, spécifiés sous forme de chaînes de paramètres, séparés par des points d’interrogation (?
).
Note
Après la connexion, la base de données initiale, le schéma, l’entrepôt et le rôle spécifiés dans la chaîne de connexion peuvent toujours être modifiés pour la session.
Configuration du serveur proxy¶
Les paramètres du serveur proxy ne sont pas pris en charge. Utilisez plutôt des variables d’environnement prises en charge pour configurer un serveur proxy. Pour plus d’informations, voir Utilisation d’un serveur proxy.
Exemples de chaîne de connexion¶
L’exemple suivant appelle la méthode create_engine
avec le nom d’utilisateur testuser1
, le mot de passe 0123456
, l’identificateur de compte myorganization-myaccount
, la base de données testdb
, le schéma public
, l’entrepôt testwh
et le rôle myrole
:
from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' )
Par souci de facilité, vous pouvez utiliser la méthode snowflake.sqlalchemy.URL
pour construire la chaîne de connexion et vous connecter à la base de données. L’exemple suivant construit la même chaîne de connexion que l’exemple précédent :
from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', ))
Ouverture et fermeture de la connexion¶
Ouvrez une connexion en exécutant engine.connect()
; évitez d’utiliser engine.execute()
.
# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose()
Note
Assurez-vous de fermer la connexion en exécutant connection.close()
avant engine.dispose()
; sinon, le collecteur Python Garbage supprime les ressources nécessaires pour communiquer avec Snowflake, empêchant le connecteur Python de fermer correctement la session.
Si vous prévoyez d’utiliser des transactions explicites, vous devez désactiver l’option d’exécution AUTOCOMMIT dans SQLAlchemy.
Par défaut, SQLAlchemy active cette option. Lorsque cette option est activée, les instructions INSERT, UPDATE et DELETE sont automatiquement validées lors de leur exécution, même si ces instructions sont exécutées dans une transaction explicite.
Pour désactiver AUTOCOMMIT, transmettez autocommit=False
à la méthode Connection.execution_options()
. Par exemple :
# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:
try:
connection.execute("BEGIN")
connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
finally:
connection.close()
engine.dispose()
Comportement d’auto-incrémentation¶
L’auto-incrémentation d’une valeur nécessite l’objet Sequence
. Inclure l’objet Sequence
dans la colonne de clé primaire pour incrémenter automatiquement la valeur à chaque nouvel enregistrement inséré. Par exemple :
t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... )
Traitement de la casse de nom d’objet¶
Snowflake stocke tous les noms d’objets insensibles à la casse en majuscules. En revanche, SQLAlchemy considère que tous les noms d’objet en minuscules sont insensibles à la casse. Snowflake SQLAlchemy convertit la casse du nom de l’objet pendant la communication au niveau du schéma, (c’est-à-dire pendant la réflexion de la table et de l’index). Si vous utilisez des noms d’objet en majuscules, SQLAlchemy suppose qu’ils sont sensibles à la casse et qu’ils contiennent des guillemets. Ce comportement causera des discordances avec les données du dictionnaire de données reçu de Snowflake. À moins que les noms des identificateurs n’aient vraiment été créés comme étant sensibles à la casse en utilisant des guillemets, (par exemple "TestDb"
), tous les noms en minuscules doivent être utilisés du côté SQLAlchemy.
Prise en charge de l’index¶
Snowflake n’utilise pas d’index. Par conséquent, Snowflake SQLAlchemy n’en utilise pas non plus.
Prise en charge du type de données NumPy¶
Snowflake SQLAlchemy prend en change la liaison et la collecte de types de données NumPy
. La liaison est toujours prise en charge. Pour activer la récupération des types de données NumPy
, ajoutez numpy=True
aux paramètres de connexion.
Les types de données NumPy
suivants sont pris en charge :
numpy.int64
numpy.float64
numpy.datetime64
L’exemple suivant montre l’aller-retour de données numpy.datetime64
:
import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') connection = engine.connect() connection.execute( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.execute( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", engine) assert df.c1.values[0] == specific_date
Métadonnées de colonne de cache¶
SQLAlchemy fournit l’API d’inspection d’exécution pour obtenir les informations d’exécution sur les différents objets. L’un des cas d’utilisation classiques est la récupération de toutes les tables et de leurs métadonnées de colonne dans un schéma afin de construire un catalogue de schémas. Par exemple, alembic, en plus de SQLAlchemy, gère les migrations de schéma de base de données. Un flux de pseudo-code se présente comme suit :
inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ...
Dans ce flux, cela peut prendre un certain temps car les requêtes s’exécutent sur chaque table. Les résultats sont mis en cache, mais l’obtention des métadonnées de colonne est onéreuse.
Pour atténuer le problème, Snowflake SQLAlchemy prend un indicateur cache_column_metadata=True
tel que toutes les métadonnées de colonne pour toutes les tables sont mises en cache lorsque get_table_names
est appelé. Le reste de get_columns
, get_primary_keys
et get_foreign_keys
peut exploiter le cache.
engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, ))
Note
L’utilisation de la mémoire augmentera à mesure que toutes les métadonnées des colonnes associées à l’objet Inspector
seront mises en cache. N’utilisez l’indicateur que si vous avez besoin d’obtenir toutes les métadonnées de colonne.
Prise en charge de VARIANT, ARRAY et OBJECT¶
Snowflake SQLAlchemy prend en charge la récupération des types de données VARIANT
, ARRAY
et OBJECT
. Tous les types sont convertis en str
dans Python afin que vous puissiez les convertir en types de données natifs en utilisant json.loads
.
Cet exemple montre comment créer une table incluant les colonnes de type de données VARIANT
, ARRAY
et OBJECT
.
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metdata.create_all(engine)
Pour récupérer les colonnes de type de données VARIANT
, ARRAY
et OBJECT
et les convertir en types de données Python natifs, récupérez les données et appelez la méthode json.loads
comme suit :
import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2])
Prise en charge de CLUSTER BY¶
Snowflake SQLAlchemy prend en charge le paramètre CLUSTER BY
pour les tables. Pour plus d’informations sur ce paramètre, voir CREATE TABLE.
Cet exemple montre comment créer une table avec deux colonnes, id
et name
, comme clé de clustering :
t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine)
Prise en charge d’Alembic¶
Alembic est un outil de migration de base de données en plus de SQLAlchemy
. Snowflake SQLAlchemy fonctionne en ajoutant le code suivant à alembic/env.py
pour qu’Alembic puisse reconnaître Snowflake SQLAlchemy.
from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake'
Voir la documentation d’Alembic pour des informations sur l’utilisation générale.
Prise en charge de l’authentification par paire de clés¶
Snowflake SQLAlchemy prend en charge l’authentification par paire de clés en tirant parti de la fonctionnalité du connecteur Snowflake pour Python. Voir Utilisation de l’authentification par paire de clés et rotation de paires de clés pour connaître les étapes de création de clés privées et publiques.
Le paramètre de clé privée est transmis dans connect_args
comme suit :
... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, )
Où PRIVATE_KEY_PASSPHRASE
est une phrase secrète pour décrypter le fichier de clé privée, rsa_key.p8
.
La méthode snowflake.sqlalchemy.URL
ne prend pas en charge les paramètres de clé privée.
Prise en charge de la commande de fusion¶
Snowflake SQLAlchemy prend en charge l’exécution d’un upsert avec son expression personnalisée MergeInto
. Voir MERGE pour une documentation complète.
Utilisez cela comme suit :
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge)
Prise en charge de CopyIntoStorage¶
Snowflake SQLAlchemy prend en charge l’enregistrement des tables et des résultats de requêtes dans différentes zones de préparation Snowflake, conteneurs Azure et compartiments AWS avec son expression CopyIntoStorage
personnalisée. Voir COPY INTO <emplacement> pour une documentation complète.
Utilisez cela comme suit :
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into)