Utilisation du connecteur Python

Ce chapitre fournit une série d’exemples illustrant comment utiliser le connecteur Snowflake pour effectuer des opérations Snowflake standard, telles que la connexion d’utilisateurs, la création de bases de données et de tables, la création d’entrepôts, l’insertion/le chargement de données et l’envoi de requêtes.

L’exemple de code à la fin de ce chapitre combine les exemples en un seul programme Python fonctionnel.

Dans ce chapitre :

Vérification de la connexion réseau à Snowflake avec SnowCD

Après avoir configuré votre pilote, vous pouvez évaluer et dépanner votre connectivité réseau à Snowflake en utilisant SnowCD.

Vous pouvez utiliser SnowCD pendant le processus de configuration initiale et à la demande à tout moment pour évaluer et dépanner votre connexion réseau à Snowflake.

Connexion à Snowflake

Importez le module snowflake.connector :

import snowflake.connector

Lisez les informations de connexion à partir des variables d’environnement, de la ligne de commande, d’un fichier de configuration ou d’une autre source appropriée. Par exemple :

PASSWORD = os.getenv('SNOWSQL_PWD')
WAREHOUSE = os.getenv('WAREHOUSE')
...

Le paramètre ACCOUNT peut nécessiter la région et la plate-forme Cloud où se trouve votre compte, sous la forme :

'<your_account_name>.<region_id>.<cloud>' (e.g. 'xy12345.east-us-2.azure').

Note

Pour la description des paramètres de connecteurs disponibles, voir les méthodes snowflake.connector.

Si vous copiez des données depuis votre propre compartiment Amazon S3, vous avez besoin des AWS_ACCESS_KEY_ID et AWS_SECRET_ACCESS_KEY.

import os

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

Note

Si vos données sont stockées dans un conteneur Microsoft Azure, fournissez les informations d’identification directement dans l’instruction COPY.

Après avoir lu les informations de connexion, connectez-vous à l’aide de l’authentificateur par défaut ou de l’authentification fédérée (si activée).

Définition des paramètres de session

Il existe plusieurs façons de définir des paramètres de session, tels que QUERY_TAG, lors de l’utilisation du connecteur Python.

Vous pouvez définir des paramètres au niveau de la session au moment où vous vous connectez à Snowflake. Pour ce faire, passez le paramètre de connexion facultatif nommé « session_parameters », comme indiqué ci-dessous :

con = snowflake.connector.connect(
    user='XXXX',
    password='XXXX',
    account='XXXX',
    session_parameters={
        'QUERY_TAG': 'EndOfMonthFinancials',
    }
)

Le dictionnaire session_parameters transmis à la méthode connect() peut contenir un ou plusieurs paramètres au niveau de la session.

Vous pouvez également définir des paramètres de session en exécutant l’instruction SQL ALTER SESSION SET ... après la connexion :

con.cursor().execute("ALTER SESSION SET QUERY_TAG = 'EndOfMonthFinancials'")

Pour plus d’informations sur les paramètres de session, consultez les descriptions des paramètres individuels sur la page générale Paramètres.

Connexion à l’aide de l’authentificateur par défaut

Connectez-vous à Snowflake en utilisant les paramètres de connexion :

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )

Vous devrez peut-être étendre cela à d’autres informations.

Utilisation de la connexion unique (SSO) pour l’authentification

Si vous avez configuré Snowflake pour utiliser la connexion unique (SSO), vous pouvez configurer votre application cliente pour utiliser SSO pour l’authentification. Voir Utilisation de SSO avec des applications clientes qui se connectent à Snowflake pour plus de détails.

Utilisation de l’authentification par paire de clés

Snowflake utilise l’authentification par paire de clés plutôt que l’authentification par nom d’utilisateur/mot de passe typique. Cette méthode d’authentification nécessite une paire de clés de 2048 bits (minimum) RSA. Générez la paire de clés publiques-privées PEM (Privacy Enhanced Mail) via OpenSSL. La clé publique est attribuée à l’utilisateur Snowflake qui utilisera le client Snowflake.

Pour configurer la paire de clés publiques/privées :

  1. Depuis la ligne de commande d’une fenêtre de terminal, générez une clé privée.

    Vous pouvez générer une version chiffrée de la clé privée ou une version non chiffrée de la clé privée.

    Pour générer une version non chiffrée, utilisez la commande suivante :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
    

    Pour générer une version chiffrée, utilisez la commande suivante (qui omet « -nocrypt ») :

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
    

    Il est généralement plus sûr de générer une version chiffrée.

    Si vous utilisez la deuxième commande pour chiffrer la clé privée, OpenSSL vous invite à indiquer une phrase secrète utilisée pour chiffrer le fichier de clé privée. Nous vous recommandons d’utiliser une phrase de chiffrement forte pour protéger la clé privée. Enregistrez cette phrase secrète dans un emplacement sécurisé. Vous l’entrerez lorsque vous vous connecterez à Snowflake. Notez que la phrase de chiffrement n’est utilisée que pour protéger la clé privée et ne sera jamais envoyée à Snowflake.

    Exemple de clé privée PEM

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. Depuis la ligne de commande, générez la clé publique en faisant référence à la clé privée :

    En supposant que la clé privée soit contenue dans le fichier nommé « rsa_key.p8 », utilisez la commande suivante :

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    Exemple de clé publique PEM

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. Copiez les fichiers de clés publiques et privées dans un répertoire local en vue de leur stockage. Enregistrez le chemin d’accès aux fichiers. Notez que la clé privée est stockée au format PKCS#8 (Public Key Cryptography Standards) et est chiffrée à l’aide de la phrase de chiffrement que vous avez spécifiée à l’étape précédente ; toutefois, le fichier doit toujours être protégé contre tout accès non autorisé au moyen du mécanisme d’autorisation de fichier fourni par votre système d’exploitation. Il est de votre responsabilité de sécuriser le fichier lorsqu’il n’est pas utilisé.

  4. Attribuez la clé publique à l’utilisateur Snowflake en utilisant ALTER USER. Par exemple :

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    Note

    • Seuls les administrateurs de sécurité (c’est-à-dire les utilisateurs ayant le rôle SECURITYADMIN) ou ayant un rôle supérieur peuvent modifier un utilisateur.

    • Excluez l’en-tête et le pied de page de la clé publique dans l’instruction SQL.

    Vérifiez l’empreinte de la clé publique de l’utilisateur en utilisant DESCRIBE USER :

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    Note

    La propriété RSA_PUBLIC_KEY_2_FP est décrite dans Rotation de clé (dans ce chapitre).

  5. Modifiez et exécutez l’exemple de code ci-dessous. Le code déchiffre le fichier de la clé privée et le transmet au pilote Snowflake pour créer une connexion :

  • Mettez à jour les paramètres de sécurité :

    • chemin : spécifie le chemin d’accès local au fichier de clé privée que vous avez créé.

  • Mettez à jour les paramètres de connexion :

    • user : spécifie votre nom d’utilisateur Snowflake.

    • account : spécifie le nom complet de votre compte (fourni par Snowflake). Selon la plate-forme Cloud (AWS ou Azure) et la région où votre compte est hébergé, le nom complet du compte peut nécessiter des segments supplémentaires. Pour plus de détails, voir Notes d’utilisation pour le paramètre account (pour la méthode connect).

    • region : Obsolète

      Spécifie l’ID de la région où se trouve votre compte. Notez que ce paramètre est obsolète et n’est documenté que pour des raisons de rétrocompatibilité.

      Le nouveau code doit préciser la région (si disponible) dans account.

Exemple de code

import snowflake.connector
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization
with open("<path>/rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

ctx = snowflake.connector.connect(
    user='<user>',
    account='<account>',
    private_key=pkb,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )

cs = ctx.cursor()

Rotation de clé

Snowflake accepte plusieurs clés actives pour permettre une rotation ininterrompue. Faites pivoter et remplacez vos clés publiques et privées en fonction du calendrier d’expiration que vous suivez en interne.

Actuellement, vous pouvez utiliser les paramètres RSA_PUBLIC_KEY et RSA_PUBLIC_KEY_2 pour ALTER USER afin d’associer jusqu’à 2 clés publiques à un seul utilisateur.

Pour faire tourner vos clés :

  1. Effectuez les étapes de la section Utilisation de l’authentification par paire de clés pour :

    • Générer un nouvel ensemble de clés privées et publiques.

    • Attribuer la clé publique à l’utilisateur. Définir la valeur de la clé publique sur RSA_PUBLIC_KEY ou RSA_PUBLIC_KEY_2 (la valeur de la clé qui n’est pas actuellement utilisée). Par exemple :

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Mettez à jour le code pour vous connecter à Snowflake. Spécifiez la nouvelle clé privée.

    Snowflake vérifie la bonne clé publique active pour l’authentification sur la base de la clé privée soumise avec vos informations de connexion.

  3. Retirez l’ancienne clé publique du profil utilisateur. Par exemple :

    alter user jsmith unset rsa_public_key;
    

Utilisation d’un serveur de proxy

Pour utiliser un serveur de proxy, configurez les variables d’environnement suivantes :

  • HTTP_PROXY

  • HTTPS_PROXY

  • NO_PROXY

Note

Les paramètres du proxy, c’est-à-dire proxy_host, proxy_port, proxy_user et proxy_password sont obsolètes. Utilisez plutôt les variables d’environnement.

Par exemple :

Linux ou macOS
export HTTP_PROXY='http://username:password@proxyserver.company.com:80'
export HTTPS_PROXY='http://username:password@proxyserver.company.com:80'
Windows
set HTTP_PROXY=http://username:password@proxyserver.company.com:80
set HTTPS_PROXY=http://username:password@proxyserver.company.com:80

Astuce

Le modèle de sécurité de Snowflake ne permet pas les proxys Secure Sockets Layer (SSL) (utilisant un certificat HTTPS). Votre serveur de proxy doit utiliser une autorité de certification accessible au public (CA), réduisant ainsi les risques potentiels de sécurité tels qu’une attaque MITM (Man In The Middle) via un proxy compromis.

Si vous devez utiliser votre proxy SSL, nous vous recommandons fortement de mettre à jour la politique du serveur pour passer par le certificat Snowflake, afin qu’aucun certificat ne soit modifié au milieu des communications.

En option, NO_PROXY peut être utilisé pour contourner le proxy pour des communications spécifiques. Par exemple, l’accès à Amazon S3 peut contourner le serveur proxy en spécifiant NO_PROXY=".amazonaws.com".

NO_PROXY ne prend pas en charge les caractères génériques. Chaque valeur spécifiée doit être l’une des suivantes :

  • La fin d’un nom d’hôte (ou d’un nom d’hôte complet), par exemple :

    • .amazonaws.com

    • xy12345.snowflakecomputing.com

  • Une adresse IP, par exemple :

    • 192.196.1.15

Si plusieurs valeurs sont spécifiées, elles doivent être séparées par des virgules, par exemple :

localhost,.my_company.com,.snowflakecomputing.com,192.168.1.15,192.168.1.16

Connexion avec OAuth

Pour vous connecter à l’aide de OAuth, la chaîne de connexion doit inclure le jeu de paramètres authenticator à oauth et le jeu de paramètres token à oauth_access_token. Pour plus d’informations, voir OAuth avec les clients, les pilotes et les connecteurs.

ctx = snowflake.connector.connect(
    user="<username>",
    host="<hostname>",
    account="<account_name>",
    authenticator="oauth",
    token="<oauth_access_token>",
    warehouse="test_warehouse",
    database="test_db",
    schema="test_schema"
)

OCSP

Lorsque le pilote se connecte, Snowflake envoie un certificat confirmant que la connexion est établie avec Snowflake plutôt qu’avec un hôte empruntant l’identité de Snowflake. Le pilote envoie ce certificat à un serveur OCSP (protocole de statut de certificat en ligne) pour vérifier que le certificat n’a pas été révoqué.

Si le pilote ne peut pas atteindre le serveur OCSP pour vérifier le certificat, il peut entrainer un comportement « Fail open » ou « Fail closed ».

Choix du mode Fail-Open ou Fail-Close

Les versions du connecteur Snowflake pour Python antérieures à 1.8.0 sont en mode Fail-Close par défaut. Les versions 1.8.0 et ultérieures sont en mode Fail-Open par défaut. Vous pouvez remplacer le comportement par défaut en définissant le paramètre de connexion facultatif ocsp_fail_open lors de l’appel de la méthode connect(). Par exemple :

con = snowflake.connector.connect(
    account=<account>,
    user=<user>,
    ...,
    ocsp_fail_open=False,
    ...);

Vérification de la version du connecteur ou du pilote OCSP

La version du pilote ou du connecteur et sa configuration déterminent le comportement OCSP. Pour plus d’informations sur la version du pilote ou du connecteur, leur configuration et le comportement de OCSP, voir Configuration d’OCSP.

Mise en cache des réponses OCSP

Pour assurer la sécurité de toutes les communications, le connecteur Snowflake pour Python utilise le protocole HTTPS pour se connecter à Snowflake, ainsi que pour se connecter à tous les autres services (par ex. Amazon S3 pour la mise en zone de préparation de fichiers de données et Okta pour l’authentification fédérée). Outre le protocole HTTPS classique, le connecteur vérifie également le statut de révocation du certificat TLS/SSL sur chaque connexion via OCSP (Online Certificate Status Protocol) et interrompt la connexion si le certificat est révoqué ou si le statut OCSP n’est pas fiable.

Comme chaque connexion Snowflake déclenche jusqu’à trois allers-retours avec le serveur OCSP, plusieurs niveaux de cache pour les réponses OCSP ont été introduits pour réduire la surcharge réseau ajoutée à la connexion :

  • La mémoire cache, qui continue d’exister durant la vie du processus.

  • Le cache de fichier, qui continue d’exister jusqu’à ce que le répertoire de cache (par exemple ~/.cache/snowflake) soit purgé.

  • Cache de serveur de réponse OCSP.

La mise en cache traite également les problèmes de disponibilité des serveurs OCSP (c’est-à-dire dans le cas où le serveur OCSP réel est en panne). Tant que le cache est valide, le connecteur peut toujours valider le statut de révocation du certificat.

Si aucune des couches du cache ne contient la réponse OCSP, le client tente de récupérer le statut de validation directement depuis le serveur OCSP du CA.

Modification de l’emplacement du cache de fichier de réponse OCSP

Par défaut, le cache de fichier est activé dans les emplacements suivants, de sorte qu’aucune tâche de configuration supplémentaire n’est nécessaire :

Linux

~/.cache/snowflake/ocsp_response_cache.json

macOS

~/Library/Caches/Snowflake/ocsp_response_cache.json

Windows

%USERPROFILE%\AppData\Local\Snowflake\Caches\ocsp_response_cache.json

Cependant, si vous voulez spécifier un emplacement et/ou un nom de fichier différent pour le fichier de cache de OCSP réponse, la méthode connect accepte le paramètre ocsp_response_cache_filename, qui spécifie le chemin et le nom du fichier de cache OCSP sous la forme d’un URI.

Serveur de cache de réponse OCSP

Note

Le serveur de cache de réponse OCSP est actuellement pris en charge par le connecteur Snowflake pour Python 1.6.0 et ultérieur.

Les types de mémoire et de fichier de cache OCSP fonctionnent bien pour les applications connectées à Snowflake utilisant un des clients que Snowflake fournit, avec un hôte persistant. Cependant, ils ne fonctionnent pas dans des environnements à alimentation dynamique tels que AWS Lambda ou Docker.

Pour remédier à cette situation, Snowflake fournit un troisième niveau de mise en cache : le serveur de cache de réponse OCSP. Le serveur de cache de réponse OCSP récupère les réponses OCSP toutes les heures sur les serveurs OCSP du CA et les stocke pendant 24 heures. Les clients peuvent alors interroger le statut de validation d’un certificat Snowflake donné à partir de ce cache serveur.

Important

Si votre politique de serveur refuse l’accès à la plupart ou à la totalité des adresses IP et sites Web externes, vous devez ajouter l’adresse du serveur de cache à la liste blanche pour permettre le fonctionnement normal du service. L’URL du serveur de cache est ocsp*.snowflakecomputing.com:80.

Si vous avez besoin de désactiver le serveur de cache pour une raison quelconque, réglez la variable d’environnement SF_OCSP_RESPONSE_CACHE_SERVER_ENABLED sur false. Notez que la valeur est sensible à la casse et doit être en minuscules.

Création d’une base de données, d’un schéma et d’un entrepôt

Après la connexion, créez une base de données, un schéma et un entrepôt, s’ils n’existent pas encore, en utilisant les commandes CREATE DATABASE, CREATE SCHEMA et CREATE WAREHOUSE.

L’exemple ci-dessous montre comment créer un entrepôt nommé tiny_warehouse, une base de données nommée testdb et un schéma nommé testschema. Notez que lorsque vous créez le schéma, vous devez spécifier le nom de la base de données dans laquelle créer le schéma ou vous devez déjà être connecté à la base de données dans laquelle créer le schéma. L’exemple ci-dessous exécute une commande USE DATABASE avant la commande CREATE SCHEMA pour s’assurer que le schéma est créé dans la base de données appropriée.

        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")

Utilisation de la base de données, du schéma et de l’entrepôt

Spécifiez la base de données et le schéma dans lesquels vous souhaitez créer les tables. Spécifiez également l’entrepôt qui fournira les ressources nécessaires à l’exécution des instructions et des requêtes DML.

Par exemple, pour utiliser la base de données testdb, le schéma testschema et l’entrepôt tiny_warehouse (créé dans la section précédente) :

        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")

Création de tables et insertion de données

Utilisez la commande CREATE TABLE pour créer des tables et la commande INSERT pour remplir les tables avec des données.

Par exemple, créez une table nommée testtable et insérez deux lignes dans la table :

    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")

Chargement des données

Au lieu d’insérer des données dans les tables à l’aide de commandes INSERT individuelles, vous pouvez charger en masse des données à partir de fichiers préparés dans un emplacement interne ou externe.

Copie de données à partir d’un emplacement interne

Pour charger des données de fichiers de votre machine hôte vers une table, utilisez d’abord la commande PUT pour placer le fichier dans un emplacement interne, puis utilisez la commande COPY INTO <table> pour copier les données des fichiers vers la table.

Par exemple :

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

Où vos données CSV sont stockées dans un répertoire local nommé /tmp/data dans un environnement Linux ou macOS, et le répertoire contient des fichiers nommés file0, file1, … file100.

Copie de données à partir d’un emplacement externe

Pour charger des données à partir de fichiers préparés dans un emplacement externe (c’est-à-dire votre propre compartiment S3) vers une table, utilisez la commande COPY INTO <table>.

Par exemple :

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

Où :

  • s3://<votre_compartiment_s3>/data/ spécifie le nom de votre compartiment S3.

  • Les fichiers dans le compartiment sont préfixés avec data.

  • Le compartiment est accessible à l’aide d’une intégration de stockage créée à l’aide de CREATE STORAGE INTEGRATION par un administrateur de compte (c’est-à-dire un utilisateur avec le rôle ACCOUNTADMIN) ou un rôle avec le privilège global CREATE INTEGRATION. Une intégration de stockage permet aux utilisateurs d’éviter de fournir des informations d’identification pour accéder à un emplacement de stockage privé.

Note

Cet exemple utilise la fonction format() pour composer l’instruction. Si votre environnement présente un risque d’attaques par injection SQL, vous pouvez, si vous le souhaitez, lier des valeurs plutôt que d’utiliser format().

Interrogation de données

Utilisation de cursor pour récupérer des valeurs

Récupérez les valeurs d’une table à l’aide de la méthode d’itérateur d’objet curseur.

Par exemple, pour extraire des colonnes nommées « col1 » et « col2 » de la table nommée testtable créée précédemment (dans Création de tables et insertion de données), utilisez un code similaire à celui-ci :

    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()

Le connecteur Snowflake pour Python fournit un raccourci utile :

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))

Si vous avez besoin d’obtenir un seul résultat (c’est-à-dire une seule ligne), utilisez la méthode fetchone :

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))

Si vous avez besoin d’obtenir le nombre spécifié de lignes à un moment donné, utilisez la méthode fetchmany avec le nombre de lignes :

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)

Note

Utilisez fetchone ou fetchmany si le nombre de résultats est trop grand pour la mémoire.

Si vous avez besoin de tous les résultats en même temps :

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))

Pour définir un délai d’attente pour une requête, exécutez une commande « begin » et incluez un paramètre de délai d’attente dans la requête. Si la requête dépasse la longueur de la valeur du paramètre, une erreur se produit et un retour en arrière se produit.

Dans le code suivant, l’erreur 604 signifie que la requête a été annulée. Le paramètre de délai d’attente démarre Timer() et s’annule si la requête ne se termine pas dans le délai spécifié.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")

Utilisation de DictCursor pour récupérer des valeurs par nom de colonne

Si vous voulez récupérer une valeur par nom de colonne, créez un objet cursor de type DictCursor.

Par exemple :

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()

Annulation d’une requête par ID de requête

Annulation d’une requête par ID de requête

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()

Remplacez la chaîne « queryID » par l’ID de la requête.

Amélioration des performances des requêtes en contournant la conversion des données

Pour améliorer les performances des requêtes, utilisez la classe SnowflakeNoConverterToPython du module snowflake.connector.converter_null pour contourner les conversions de données du type de données interne Snowflake vers le type de données Python natif. Exemple :

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data

En conséquence, toutes les données sont représentées sous forme de chaîne, de sorte que l’application soit chargée de les convertir en types de données Python natifs. Par exemple, les données TIMESTAMP_NTZ et TIMESTAMP_LTZ sont l’heure d’époque représentée sous forme de chaîne, et les données TIMESTAMP_TZ correspondent à l’heure suivie d’un espace suivi du décalage par rapport à UTC en minutes, représentés sous forme de chaîne.

Aucun impact n’est fait sur les données de liaison ; les données natives Python peuvent toujours être liées aux mises à jour.

Données de liaison

Pour spécifier les valeurs à utiliser dans une instruction SQL, vous pouvez inclure des littéraux dans l’instruction ou lier des variables. Lorsque vous liez des variables, vous insérez un ou plusieurs espaces réservés dans le texte de l’instruction SQL, puis vous spécifiez la variable (la valeur à utiliser) pour chaque caractère de remplacement.

L’exemple suivant compare l’utilisation des littéraux et de la liaison :

Littéraux :

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")

Liaison :

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))

Note

Il existe une limite supérieure à la taille des données que vous pouvez lier ou que vous pouvez combiner dans un lot. Pour plus de détails, voir Limites de la taille du texte de requête.

Snowflake prend en charge les types de liaisons suivants :

  • pyformat.

  • format.

  • qmark.

  • numeric.

Chacun de ceux-ci est expliqué ci-dessous.

Liaison pyformat ou format

La liaison pyformat et la liaison format lient les données de liaison côté client plutôt que côté serveur.

Par défaut, le connecteur Snowflake pour Python prend en charge pyformat et format pour que vous puissiez utiliser %(name)s ou %s comme caractère de remplacement. Par exemple :

  • Utilisation de %(name)s comme caractère de remplacement :

        conn.cursor().execute(
            "INSERT INTO test_table(col1, col2) "
            "VALUES(%(col1)s, %(col2)s)", {
                'col1': 789,
                'col2': 'test string3',
                })
    
  • Utilisation de %s comme caractère de remplacement :

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    

Vous pouvez également utiliser un objet de liste pour lier des données de l’opérateur IN :

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))

Le caractère pourcentage (« % ») est à la fois un caractère générique pour SQL LIKE et un caractère de liaison de format pour Python. Si vous utilisez la liaison de format et si votre commande SQL contient le caractère pourcentage, vous devrez peut-être échapper le caractère pourcentage. Par exemple, si votre instruction SQL est :

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.

votre code Python devrait ressembler à ceci (notez le signe de pourcentage supplémentaire pour échapper le signe de pourcentage d’origine) :

        sql_command = "select col1, col2 from test_table "
        sql_command += " where col2 like '%%York' limit %(lim)s"
        parameter_dictionary = {'lim': 1 }
        cur.execute(sql_command, parameter_dictionary)

Liaison qmark ou numeric

La liaison qmark et la liaison numeric lient les données de liaison côté serveur plutôt que côté client.

Pour utiliser la liaison de type qmark ou numeric, définissez le mode sur « qmark » en exécutant :

snowflake.connector.paramstyle='qmark'

Important

Vous devez définir paramstyle avant d’appeler la méthode connect().

Si paramstyle est spécifié comme qmark ou numeric dans le paramètre de connexion, les variables de liaison doivent être ? ou :N respectivement, et la liaison se produit côté serveur.

Par exemple :

  • Utilisation de ? comme caractère de remplacement :

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
  • Utilisation de :N comme caractère de remplacement :

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
  • Liaison datetime avec TIMESTAMP à l’aide de la liaison qmark :

    Lorsque vous utilisez la liaison qmark ou la liaison numeric pour lier des données à un type de données Snowflake TIMESTAMP, spécifiez le type de données d’horodatage Snowflake, c’est-à-dire TIMESTAMP_LTZ ou TIMESTAMP_TZ, sous la forme d’un tuple.

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "CREATE OR REPLACE TABLE testtable2 ("
        "   col1 int, "
        "   col2 string, "
        "   col3 timestamp_ltz"
        ")"
    )
    
    con.cursor().execute(
        "INSERT INTO testtable2(col1,col2,col3) "
        "VALUES(?,?,?)", (
            987,
            'test string4',
            ("TIMESTAMP_LTZ", datetime.now())
        )
    )
    

    Contrairement à la liaison côté client, la liaison côté serveur nécessite le type de données Snowflake pour la colonne. La plupart des types de données Python classiques font déjà l’objet de mappages implicites aux types de données Snowflake (par exemple, int est mappé à FIXED). Cependant, étant donné que les données Python datetime peuvent être liées à plusieurs types de données Snowflake (par exemple, TIMESTAMP_NTZ, TIMESTAMP_LTZ ou TIMESTAMP_TZ) et que le mappage par défaut est TIMESTAMP_NTZ, la liaison des données Python datetime nécessite que l’utilisateur spécifie un type de données spécifique (par exemple, TIMESTAMP_LTZ). Par conséquent, le type de données doit être spécifié comme indiqué dans l’exemple ci-dessus.

Eviter les attaques par injection SQL

Évitez la liaison de données à l’aide de la fonction de formatage Python, car vous générez un risque d’injection SQL. Par exemple :

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )

Stockez plutôt les valeurs dans des variables, vérifiez-les (par exemple, en cherchant des points-virgules suspects dans des chaînes), puis liez les paramètres à l’aide de qmark ou d’un style de liaison numérique.

Récupération de métadonnées de colonne

Les métadonnées de colonne sont stockées dans l’objet Cursor dans l’attribut description.

L’exemple simple suivant récupère la liste des noms de colonne :

    cur = conn.cursor()
    cur.execute("SELECT * FROM test_table")
    print(','.join([col[0] for col in cur.description]))

Récupération des IDs de requête Snowflake

Un ID de requête est attribué à chaque requête exécutée par Snowflake. Dans l’interface Web de Snowflake, les IDs de requête sont affichés dans la page History History tab et lors de la vérification du statut d’une requête.

Le connecteur Snowflake pour Python fournit un attribut spécial, sfqid, dans l’objet Cursor pour que vous puissiez l’associer au statut dans l’interface Web. Pour récupérer l’ID de requête Snowflake, exécutez d’abord la requête, puis récupérez-le par l’attribut sfqid :

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

Gestion des erreurs

L’application doit traiter correctement les exceptions soulevées par le connecteur Snowflake, et décider de continuer ou d’arrêter l’exécution du code.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

Utilisation de execute_stream pour exécuter des scripts SQL

La fonction execute_stream vous permet d’exécuter un ou plusieurs scripts SQL dans un flux :

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)

Fermeture de la connexion

Nous vous recommandons de fermer la connexion en utilisant la méthode close :

        connection.close()

Ceci permet de s’assurer que les métriques client collectées sont soumises au serveur et que la session est supprimée. De plus, les blocs try-finally permettent de s’assurer que la connexion est fermée même si une exception est levée au milieu :

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()

Utilisation du gestionnaire de contexte pour connecter et contrôler les transactions

Le connecteur Snowflake pour Python prend en charge un gestionnaire de contexte attribuant et libérant des ressources au besoin. Le gestionnaire de contexte est utile pour valider ou annuler des transactions en fonction du statut de l’instruction lorsque autocommit est désactivé.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail

Dans l’exemple ci-dessus, lorsque la troisième instruction échoue, le gestionnaire de contexte annule les modifications apportées à la transaction et ferme la connexion. Si toutes les instructions étaient réussies, le gestionnaire de contexte validerait les modifications et fermerait la connexion.

Un code équivalent avec les blocs try et except est le suivant :

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()

Connexion

Le connecteur Snowflake pour Python utilise le module standard Python logging pour enregistrer le statut à intervalles réguliers afin que l’application puisse suivre son activité en arrière-plan. La façon la plus simple d’activer la journalisation est d’ouvrir logging.basicConfig() au début de l’application.

Par exemple, pour définir le niveau de journalisation sur INFO et stocker les journaux dans un fichier nommé /tmp/snowflake_python_connector.log :

        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)

Une journalisation plus complète peut être activée en réglant le niveau de journalisation sur DEBUG comme suit :

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

La classe de formateur SecretDetector facultative mais recommandée garantit qu’un certain ensemble d’informations sensibles connues est masqué avant d’être écrit dans les fichiers journaux de Snowflake Python Connector. Pour utiliser SecretDetector, utilisez un code similaire au suivant :

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

Note

botocore et boto3 sont disponibles via AWS (Amazon Web Services) SDK pour Python.

Exemple de programme

L’exemple de code suivant combine plusieurs exemples décrits dans les chapitres précédents en un programme Python fonctionnel : Cet exemple contient deux parties :

  • Une classe parente (« python_veritas_base ») contient le code de nombreuses opérations courantes, telles que la connexion au serveur.

  • Une classe enfant (« python_connector_example ») représente les parties personnalisées d’un client particulier, par exemple, l’interrogation d’une table.

Cet exemple de code est importé directement à partir de l’un de nos tests pour garantir qu’il a été exécuté sur une version récente du produit.

Étant donné que cela provient d’un test, il inclut une petite quantité de code pour définir un autre port et un autre protocole utilisés dans certains tests. Les clients ne doivent pas définir le protocole ou le numéro de port ; au lieu de cela, omettez-les et utilisez les valeurs par défaut.

Il contient également des marqueurs de section (parfois appelés « balises d’extrait ») pour identifier le code qui peut être importé indépendamment dans la documentation. Les marqueurs de section ressemblent généralement à :

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------

Ces marqueurs de section ne sont pas requis dans le code client du client.

La première partie de l’exemple de code contient les sous-routines courantes pour :

  • Lisez les arguments de ligne de commande (par exemple, « –warehouse MyWarehouse ») qui contiennent des informations de connexion.

  • Connectez-vous au serveur.

  • Créez et utilisez un entrepôt, une base de données et un schéma.

  • Détruisez le schéma, la base de données et l’entrepôt lorsque vous n’en avez plus besoin.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This connects gets account and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account and login information from environment variables and
        # command-line parameters.
        # Note that ACCOUNT might require the region and cloud platform where
        # your account is located, in the form of
        #     '<your_account_name>.<region>.<cloud>'
        # for example
        #     'xy12345.us-east-1.azure')
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
                PROTOCOL = connection_parameters["protocol"]
            except:
                PORT = ""
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


La deuxième partie de l’exemple de code crée une table, y insère des lignes, etc. :


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

Pour exécuter cet exemple, procédez comme suit :

  1. Copiez le premier morceau de code dans un fichier nommé « python_veritas_base.py ».

  2. Copiez le deuxième morceau de code dans un fichier nommé « python_connector_example.py »

  3. Définissez la variable d’environnement SNOWSQL_PWD sur votre mot de passe, par exemple :

    export SNOWSQL_PWD='MyPassword'
    
  4. Exécutez le programme à l’aide d’une ligne de commande similaire à la suivante (remplacez les informations d’utilisateur et de compte par vos propres informations d’utilisateur et de compte, bien sûr).

    Avertissement

    Cela supprime l’entrepôt, la base de données et le schéma à la fin du programme ! N’utilisez pas le nom d’une base de données existante, car vous la perdrez !

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account xy98764 --user MyUserName
    

Voici la sortie :

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...

Voici un exemple plus long :

Note

Dans le chapitre où vous configurez votre compte et vos informations de connexion, assurez-vous de remplacer les variables au besoin pour les faire correspondre à vos informations de connexion Snowflake (nom, mot de passe, etc.).

Cet exemple utilise la fonction format() pour composer l’instruction. Si votre environnement présente un risque d’attaques par injection SQL, vous pouvez, si vous le souhaitez, lier des valeurs plutôt que d’utiliser format().

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values). Note that ACCOUNT might also require the
# region and cloud platform where your account is located, in the form of
# '<your_account_name>.<region_id>.<cloud_platform>' (e.g. 'xy12345.east-us-2.azure')
ACCOUNT = '<your_account_name>'
USER = '<your_login_name>'
PASSWORD = '<your_password>'

import os

# Only required if you copy data from your own S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <your_s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<your_s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()