Utilisation du connecteur Python

Ce chapitre fournit une série d’exemples illustrant comment utiliser le connecteur Snowflake pour effectuer des opérations Snowflake standard, telles que la connexion d’utilisateurs, la création de bases de données et de tables, la création d’entrepôts, l’insertion/le chargement de données et l’envoi de requêtes.

L’exemple de code à la fin de ce chapitre combine les exemples en un seul programme Python fonctionnel.

Dans ce chapitre :

Création d’une base de données, d’un schéma et d’un entrepôt

Après la connexion, créez une base de données, un schéma et un entrepôt, s’ils n’existent pas encore, en utilisant les commandes CREATE DATABASE, CREATE SCHEMA et CREATE WAREHOUSE.

L’exemple ci-dessous montre comment créer un entrepôt nommé tiny_warehouse, une base de données nommée testdb et un schéma nommé testschema. Notez que lorsque vous créez le schéma, vous devez spécifier le nom de la base de données dans laquelle créer le schéma ou vous devez déjà être connecté à la base de données dans laquelle créer le schéma. L’exemple ci-dessous exécute une commande USE DATABASE avant la commande CREATE SCHEMA pour s’assurer que le schéma est créé dans la base de données appropriée.

conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Copy

Utilisation de la base de données, du schéma et de l’entrepôt

Spécifiez la base de données et le schéma dans lesquels vous souhaitez créer les tables. Spécifiez également l’entrepôt qui fournira les ressources nécessaires à l’exécution des instructions et des requêtes DML.

Par exemple, pour utiliser la base de données testdb, le schéma testschema et l’entrepôt tiny_warehouse (créé dans la section précédente) :

conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Copy

Création de tables et insertion de données

Utilisez la commande CREATE TABLE pour créer des tables et la commande INSERT pour remplir les tables avec des données.

Par exemple, créez une table nommée testtable et insérez deux lignes dans la table :

conn.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "test_table(col1 integer, col2 string)")

conn.cursor().execute(
    "INSERT INTO test_table(col1, col2) VALUES " + 
    "    (123, 'test string1'), " + 
    "    (456, 'test string2')")
Copy

Chargement des données

Au lieu d’insérer des données dans les tables à l’aide de commandes INSERT individuelles, vous pouvez charger en masse des données à partir de fichiers préparés dans un emplacement interne ou externe.

Copie de données à partir d’un emplacement interne

Pour charger des données de fichiers de votre machine hôte vers une table, utilisez d’abord la commande PUT pour placer le fichier dans un emplacement interne, puis utilisez la commande COPY INTO <table> pour copier les données des fichiers vers la table.

Par exemple :

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
Copy

Où vos données CSV sont stockées dans un répertoire local nommé /tmp/data dans un environnement Linux ou macOS, et le répertoire contient des fichiers nommés file0, file1, … file100.

Copie de données à partir d’un emplacement externe

Pour charger des données à partir de fichiers préparés dans un emplacement externe (c’est-à-dire votre propre compartiment S3) vers une table, utilisez la commande COPY INTO <table> .

Par exemple :

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
Copy

Où :

  • s3://<compartiment_s3>/data/ spécifie le nom de votre compartiment S3

  • Les fichiers dans le compartiment sont préfixés avec data.

  • Le compartiment est accessible à l’aide d’une intégration de stockage créée à l’aide de CREATE STORAGE INTEGRATION par un administrateur de compte (c’est-à-dire un utilisateur avec le rôle ACCOUNTADMIN) ou un rôle avec le privilège global CREATE INTEGRATION. Une intégration de stockage permet aux utilisateurs d’éviter de fournir des informations d’identification pour accéder à un emplacement de stockage privé.

Note

Cet exemple utilise la fonction format() pour composer l’instruction. Si votre environnement présente un risque d’attaques par injection SQL, vous pouvez, si vous le souhaitez, lier des valeurs plutôt que d’utiliser format().

Interrogation de données

Avec le connecteur Snowflake pour Python, vous pouvez soumettre :

  • une requête synchrone, qui renvoie le contrôle à votre application une fois la requête terminée.

  • une requête asynchrone, qui renvoie le contrôle à votre application avant la fin de la requête.

Une fois la requête terminée, vous utilisez l’objet Cursor pour récupérer les valeurs dans les résultats. Par défaut, le connecteur Snowflake pour Python convertit les valeurs des types de données Snowflake en types de données Python natifs. (Notez que vous pouvez choisir de renvoyer les valeurs sous forme de chaînes de caractères et d’effectuer les conversions de type dans votre application. Voir Amélioration des performances des requêtes en contournant la conversion des données.)

Note

Par défaut, les valeurs des colonnes NUMBER sont renvoyées sous forme de valeurs flottantes en double précision (float64). Pour les retourner sous forme de valeurs décimales (decimal.Decimal) dans les méthodes fetch_pandas_all() et fetch_pandas_batches() définissez le paramètre arrow_number_to_decimal de la méthode connect() sur True.

Exécution d’une requête synchrone

Pour effectuer une requête synchrone, appelez la méthode execute() dans l’objet Cursor. Par exemple :

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Copy

Utilisez l’objet Cursor pour récupérer les valeurs dans les résultats, comme expliqué dans Utilisation de cursor pour récupérer des valeurs.

Exécution d’une requête asynchrone

Le connecteur Snowflake pour Python prend en charge les requêtes asynchrones (c’est-à-dire les requêtes qui renvoient le contrôle à l’utilisateur avant la fin de la requête). Vous pouvez soumettre une requête asynchrone et utiliser l’interrogation pour déterminer quand la requête est terminée. Une fois la requête terminée, vous pouvez obtenir les résultats.

Note

Pour effectuer des requêtes asynchrones, vous devez vous assurer que le paramètre de configuration ABORT_DETACHED_QUERY est FALSE (valeur par défaut).

Snowflake ferme automatiquement les connexions après un certain temps (par défaut : 5 minutes), ce qui rend orphelines toutes les requêtes actives. Si la valeur est TRUE, Snowflake met fin à ces requêtes orphelines, ce qui peut avoir un impact sur les requêtes asynchrones.

Grâce à cette fonction, vous pouvez soumettre plusieurs requêtes en parallèle sans attendre que chaque requête soit terminée. Vous pouvez également exécuter une combinaison de requêtes synchrones et asynchrones au cours de la même session.

Enfin, vous pouvez soumettre une requête asynchrone à partir d’une connexion et vérifier les résultats à partir d’une autre connexion. Par exemple, un utilisateur peut lancer une requête de longue durée à partir de votre application, quitter l’application et la relancer ultérieurement pour vérifier les résultats.

Soumission d’une requête asynchrone

Pour soumettre une requête asynchrone, appelez la méthode execute_async() dans l’objet Cursor. Par exemple :

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Copy

Après avoir soumis la requête :

Pour des exemples d’exécution de requêtes asynchrones, voir Exemples de requêtes asynchrones.

Meilleures pratiques pour les requêtes asynchrones

Lorsque vous soumettez une requête asynchrone, suivez ces bonnes pratiques :

  • Assurez-vous de savoir quelles requêtes dépendent d’autres requêtes avant d’exécuter des requêtes en parallèle. Certaines requêtes sont interdépendantes et sensibles à l’ordre, et ne conviennent donc pas à la parallélisation. Par exemple, une instruction INSERT ne doit évidemment pas démarrer avant la fin de l’instruction CREATE TABLE correspondante.

  • Assurez-vous de ne pas exécuter trop de requêtes par rapport à la mémoire dont vous disposez. L’exécution de plusieurs requêtes en parallèle consomme généralement plus de mémoire, en particulier si plusieurs ensembles de résultats sont stockés en mémoire en même temps.

  • Lors de l’interrogation, gérez les rares cas où une requête échoue.

  • Assurez-vous que les instructions de contrôle des transactions (BEGIN, COMMIT et ROLLBACK) ne sont pas exécutées en parallèle avec d’autres instructions.

Récupération des ID de requête Snowflake

Un ID de requête identifie chaque requête exécutée par Snowflake. Lorsque vous utilisez le connecteur Snowflake pour Python afin d’exécuter une requête, vous pouvez accéder à l’ID de requête grâce à l’attribut sfqid de l’objet Cursor :

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
Copy

Vous pouvez utiliser l’ID de requête pour :

Vérification du statut d’une requête

Pour vérifier le statut d’une requête :

  1. Obtenez l’ID de la requête à partir du champ sfqid de l’objet Cursor.

  2. Transmettez l’ID de requête à la méthode get_query_status() de l’objet Connection pour renvoyer la constante enum QueryStatus qui représente le statut de la requête.

    Par défaut, get_query_status() ne signale pas d’erreur si la requête a abouti à une erreur. Si vous voulez qu’une erreur soit signalée, appelez plutôt get_query_status_throw_if_error().

  3. Utilisez la constante enum QueryStatus pour vérifier le statut de la requête.

    • Pour déterminer si la requête est toujours en cours d’exécution (par exemple, s’il s’agit d’une requête asynchrone), transmettez la constante à la méthode is_still_running() de l’objet Connection.

    • Pour déterminer si une erreur s’est produite, transmettez la constante à la méthode is_an_error().

    Pour la liste complète des constantes enum, voir QueryStatus.

L’exemple suivant exécute une requête asynchrone et vérifie le statut de la requête :

import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
  time.sleep(1)
Copy

L’exemple suivant signale une erreur si la requête a abouti à une erreur :

from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
  query_id = cur.sfqid
  while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
    time.sleep(1)
except ProgrammingError as err:
  print('Programming Error: {0}'.format(err))
Copy

Utilisation de l’ID de requête pour récupérer les résultats d’une requête

Note

Si vous avez exécuté une requête synchrone en appelant la méthode execute() sur un objet Cursor vous n’avez pas besoin d’utiliser l’ID de requête pour récupérer les résultats. Vous pouvez simplement récupérer les valeurs à partir des résultats, comme expliqué dans Utilisation de cursor pour récupérer des valeurs.

Si vous souhaitez récupérer les résultats d’une requête asynchrone ou d’une requête synchrone précédemment soumise, suivez ces étapes :

  1. Obtenez l’ID de la requête. Voir Récupération des ID de requête Snowflake.

  2. Appelez la méthode get_results_from_sfqid() dans l’objet Cursor pour récupérer les résultats.

  3. Utilisez l’objet Cursor pour récupérer les valeurs dans les résultats, comme expliqué dans Utilisation de cursor pour récupérer des valeurs.

Notez que si la requête est toujours en cours d’exécution, les méthodes de récupération (fetchone(), fetchmany(), fetchall(), etc.) attendront que la requête soit terminée.

Par exemple :

# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

Utilisation de cursor pour récupérer des valeurs

Récupérez les valeurs d’une table à l’aide de la méthode d’itérateur d’objet curseur.

Par exemple, pour extraire des colonnes nommées « col1 » et « col2 » de la table nommée testtable créée précédemment (dans Création de tables et insertion de données), utilisez un code similaire à celui-ci :

cur = conn.cursor()
try:
    cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()
Copy

Le connecteur Snowflake pour Python fournit un raccourci utile :

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))
Copy

Si vous avez besoin d’obtenir un seul résultat (c’est-à-dire une seule ligne), utilisez la méthode fetchone :

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))
Copy

Si vous avez besoin d’obtenir le nombre spécifié de lignes à un moment donné, utilisez la méthode fetchmany avec le nombre de lignes :

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)
Copy

Note

Utilisez fetchone ou fetchmany si le nombre de résultats est trop grand pour la mémoire.

Si vous avez besoin de tous les résultats en même temps :

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))
Copy

Pour définir un délai d’attente pour une requête, exécutez une commande « begin » et incluez un paramètre de délai d’attente dans la requête. Si la requête dépasse la longueur de la valeur du paramètre, une erreur se produit et un retour en arrière se produit.

Dans le code suivant, l’erreur 604 signifie que la requête a été annulée. Le paramètre de délai d’attente démarre Timer() et s’annule si la requête ne se termine pas dans le délai spécifié.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")
Copy

Utilisation de DictCursor pour récupérer des valeurs par nom de colonne

Si vous voulez récupérer une valeur par nom de colonne, créez un objet cursor de type DictCursor.

Par exemple :

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()
Copy

Exemples de requêtes asynchrones

Voici un exemple simple d’une requête asynchrone :

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

L’exemple suivant soumet une requête asynchrone à partir d’une connexion et récupère les résultats à partir d’une autre connexion :

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Get the query ID for the asynchronous query.
query_id = cur.sfqid

# Close the cursor and the connection.
cur.close()
conn.close()

# Open a new connection.
new_conn = snowflake.connector.connect( ... )

# Create a new cursor.
new_cur = new_conn.cursor()

# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Copy

Annulation d’une requête par ID de requête

Annulation d’une requête par ID de requête

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()
Copy

Remplacez la chaîne « queryID » par l’ID de la requête. Pour obtenir l’ID d’une requête, voir Récupération des ID de requête Snowflake.

Amélioration des performances des requêtes en contournant la conversion des données

Pour améliorer les performances des requêtes, utilisez la classe SnowflakeNoConverterToPython du module snowflake.connector.converter_null pour contourner les conversions de données du type de données interne Snowflake vers le type de données Python natif. Exemple :

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data
Copy

En conséquence, toutes les données sont représentées sous forme de chaîne, de sorte que l’application soit chargée de les convertir en types de données Python natifs. Par exemple, les données TIMESTAMP_NTZ et TIMESTAMP_LTZ sont l’heure d’époque représentée sous forme de chaîne, et les données TIMESTAMP_TZ correspondent à l’heure suivie d’un espace suivi du décalage par rapport à UTC en minutes, représentés sous forme de chaîne.

Aucun impact n’est fait sur les données de liaison ; les données natives Python peuvent toujours être liées aux mises à jour.

Données de liaison

Pour spécifier les valeurs à utiliser dans une instruction SQL, vous pouvez inclure des littéraux dans l’instruction ou lier des variables. Lorsque vous liez des variables, vous insérez un ou plusieurs espaces réservés dans le texte de l’instruction SQL, puis vous spécifiez la variable (la valeur à utiliser) pour chaque caractère de remplacement.

L’exemple suivant compare l’utilisation des littéraux et de la liaison :

Littéraux :

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")
Copy

Liaison :

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))
Copy

Note

Il existe une limite supérieure à la taille des données que vous pouvez lier ou que vous pouvez combiner dans un lot. Pour plus de détails, voir Limites de la taille du texte de requête.

Snowflake prend en charge les types de liaisons suivants :

Chacun de ceux-ci est expliqué ci-dessous.

Liaison pyformat ou format

La liaison pyformat et la liaison format lient les données de liaison côté client plutôt que côté serveur.

Par défaut, le connecteur Snowflake pour Python prend en charge pyformat et format pour que vous puissiez utiliser %(name)s ou %s comme caractère de remplacement. Par exemple :

  • Utilisation de %(name)s comme caractère de remplacement :

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) "
        "VALUES(%(col1)s, %(col2)s)", {
            'col1': 789,
            'col2': 'test string3',
            })
    
    Copy
  • Utilisation de %s comme caractère de remplacement :

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    
    Copy

Avec pyformat et format, vous pouvez également utiliser un objet de liste pour lier des données de l’opérateur IN :

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))
Copy

Le caractère pourcentage (« % ») est à la fois un caractère générique pour SQL LIKE et un caractère de liaison de format pour Python. Si vous utilisez la liaison de format et si votre commande SQL contient le caractère pourcentage, vous devrez peut-être échapper le caractère pourcentage. Par exemple, si votre instruction SQL est :

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.
Copy

votre code Python devrait ressembler à ceci (notez le signe de pourcentage supplémentaire pour échapper le signe de pourcentage d’origine) :

sql_command = "select col1, col2 from test_table "
sql_command += " where col2 like '%%York' limit %(lim)s"
parameter_dictionary = {'lim': 1 }
cur.execute(sql_command, parameter_dictionary)
Copy

Liaison qmark ou numeric

La liaison qmark et la liaison numeric lient les données de liaison côté serveur plutôt que côté client :

  • Pour la liaison qmark utilisez un point d’interrogation (?) pour indiquer à quel emplacement dans la chaîne vous voulez insérer la valeur d’une variable.

  • Pour la liaison numeric utilisez le signe deux-points (:) suivi d’un nombre pour indiquer la position de la variable que vous voulez substituer à cette position. Par exemple, :2 indique la deuxième variable.

    Utilisez la liaison numérique pour lier la même valeur plus d’une fois dans la même requête. Par exemple, si vous avez une longue valeur VARCHAR ou BINARY ou semi-structurée que vous souhaitez utiliser plusieurs fois, alors la liaison numeric vous permet d’envoyer la valeur au serveur une fois et de l’utiliser plusieurs fois.

Les sections suivantes expliquent comment utiliser les liaisons qmark et numeric :

Utilisation de liaisons qmark ou numeric

Pour utiliser la liaison de style qmark ou numeric , exécutez l’une des opérations suivantes :

  • snowflake.connector.paramstyle='qmark'

  • snowflake.connector.paramstyle='numeric'

Important

Vous devez définir l’attribut paramstyle avant d’appeler la méthode connect().

Si vous définissez paramstyle sur qmark ou numeric, vous devez utiliser ? ou :N (où N est remplacé par un nombre) comme caractères de remplacement, respectivement.

Par exemple :

  • Utilisation de ? comme caractère de remplacement :

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
    Copy
  • Utilisation de :N comme caractère de remplacement :

    import snowflake.connector
    
    snowflake.connector.paramstyle='numeric'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
    Copy

    La requête suivante montre comment utiliser la liaison numeric pour réutiliser une variable :

    con.cursor().execute(
        "INSERT INTO testtable(complete_video, short_sample_of_video) "
        "VALUES(:1, SUBSTRING(:1, :2, :3))", (
            binary_value_that_stores_video,          # variable :1
            starting_offset_in_bytes_of_video_clip,  # variable :2
            length_in_bytes_of_video_clip            # variable :3
        ))
    
    Copy

Utilisation de liaisons qmark ou numeric avec des objets datetime

Lorsque vous utilisez la liaison qmark ou numeric pour lier des données à un type de données Snowflake TIMESTAMP, définissez la variable bind sur un tuple qui spécifie le type de données Snowflake timestamp (TIMESTAMP_LTZ ou TIMESTAMP_TZ) et la valeur. Par exemple :

import snowflake.connector

snowflake.connector.paramstyle='qmark'

con = snowflake.connector.connect(...)

con.cursor().execute(
    "CREATE OR REPLACE TABLE testtable2 ("
    "   col1 int, "
    "   col2 string, "
    "   col3 timestamp_ltz"
    ")"
)

con.cursor().execute(
    "INSERT INTO testtable2(col1,col2,col3) "
    "VALUES(?,?,?)", (
        987,
        'test string4',
        ("TIMESTAMP_LTZ", datetime.now())
    )
 )
Copy

Contrairement à la liaison côté client, la liaison côté serveur nécessite le type de données Snowflake pour la colonne. La plupart des types de données Python classiques font déjà l’objet de mappages implicites aux types de données Snowflake (par exemple, int est mappé à FIXED). Cependant, comme les données Python datetime peuvent être liées à l’un des multiples types de données Snowflake (TIMESTAMP_NTZ, TIMESTAMP_LTZ ou TIMESTAMP_TZ) et que le mappage par défaut est TIMESTAMP_NTZ, vous devez spécifier le type de données Snowflake à utiliser.

Utilisation de variables de liaison avec l’opérateur IN

qmark et numeric (liaison côté serveur) ne prennent pas en charge l’utilisation de variables de liaison avec l’opérateur IN.

Si vous devez utiliser des variables de liaison avec l’opérateur IN, utilisez la liaison côté client (pyformat ou format).

Liaison de paramètres aux variables pour les insertions par lots

Dans le code de votre application, vous pouvez insérer plusieurs lignes dans un seul lot. Pour ce faire, utilisez les paramètres comme valeurs dans une instruction INSERT. Par exemple, l’instruction suivante utilise des caractères de remplacement pour la qmark liaison dans une instruction INSERT :

insert into grocery (item, quantity) values (?, ?)
Copy

Ensuite, pour spécifier les données qui doivent être insérées, définissez une variable qui est une séquence de séquences (par exemple, une liste de tuples) :

rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
Copy

Comme le montre l’exemple ci-dessus, chaque élément de la liste est un tuple qui contient les valeurs des colonnes d’une ligne à insérer.

Pour effectuer la liaison, appelez la méthode executemany(), en transmettant la variable comme deuxième argument. Par exemple :

conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
    "insert into grocery (item, quantity) values (?, ?)",
    rows_to_insert)
Copy

Si vous liez des données sur le serveur (c’est-à-dire en utilisant la liaison qmark ou numeric), le connecteur peut optimiser les performances des insertions par lots grâce à la liaison.

Lorsque vous utilisez cette technique pour insérer un grand nombre de valeurs, le pilote peut améliorer les performances en diffusant les données (sans créer de fichiers sur la machine locale) vers une zone de préparation temporaire pour l’acquisition. Le pilote le fait automatiquement lorsque le nombre de valeurs dépasse un seuil.

En outre, la base de données et le schéma actuels de la session doivent être définis. Si ceux-ci ne sont pas définis, la commande CREATE TEMPORARY STAGE exécutée par le pilote peut échouer avec l’erreur suivante :

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Copy

Note

Pour découvrir d’autres moyens de charger des données dans la base de données Snowflake (y compris le chargement en masse à l’aide de la commande COPY), consultez Chargement de données dans Snowflake.

Eviter les attaques par injection SQL

Évitez la liaison de données à l’aide de la fonction de formatage Python, car vous générez un risque d’injection SQL. Par exemple :

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )
Copy

Au lieu de cela, stockez les valeurs dans des variables, puis liez ces variables en utilisant le style de liaison qmark ou numérique.

Récupération de métadonnées de colonne

Pour récupérer les métadonnées de chaque colonne dans le jeu de résultats (par exemple, le nom, le type, la précision, l’échelle, etc. de chaque colonne), utilisez l’une des approches suivantes :

  • Pour accéder aux métadonnées après avoir appelé la méthode execute() pour exécuter la requête, utilisez l’attribut description de l’objet Cursor.

  • Pour accéder aux métadonnées sans avoir à exécuter la requête, appelez la méthode describe().

    La méthode describe est disponible dans le connecteur Snowflake pour Python 2.4.6 et les versions plus récentes.

L’attribut description prend l’une des valeurs suivantes :

  • Version 2.4.5 et antérieure : une liste de tuples.

  • Version 2.4.6 et ultérieure : une liste d’objets ResultMetadata. (La méthode describe renvoie également cette liste).

Chaque tuple et objet ResultMetadata contient les métadonnées d’une colonne (le nom de la colonne, le type de données, etc.). Vous pouvez accéder aux métadonnées par l’index ou, dans 2.4.6 et les versions ultérieures, par l’attribut ResultMetadata.

Les exemples suivants montrent comment accéder aux métadonnées à partir des tuples et des objets ResultMetadata retournés.

Exemple : récupérer les métadonnées du nom de la colonne par index (versions 2.4.5 et ultérieures) :

L’exemple suivant utilise l’attribut description pour récupérer la liste des noms de colonnes après avoir exécuté une requête. L’attribut est une liste de tuples, et l’exemple accède au nom de la colonne à partir de la première valeur de chaque tuple.

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col[0] for col in cur.description]))
Copy

Exemple : récupérer les métadonnées du nom de la colonne par attribut (versions 2.4.6 et ultérieures) :

L’exemple suivant utilise l’attribut description pour récupérer la liste des noms de colonnes après avoir exécuté une requête. L’attribut est une liste d’objets ResultMetaData, et l’exemple accède au nom de la colonne à partir de l’attribut name de chaque objet ResultMetadata.

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col.name for col in cur.description]))
Copy

Exemple : récupérer les métadonnées du nom de la colonne sans exécuter de requête (versions 2.4.6 et ultérieures) :

L’exemple suivant utilise la méthode describe pour récupérer la liste des noms de colonnes sans exécuter de requête. La méthode describe() renvoie une liste d’objets ResultMetaData, et l’exemple accède au nom de la colonne à partir de l’attribut name de chaque objet ResultMetadata.

cur = conn.cursor()
result_metadata_list = cur.describe("SELECT * FROM test_table")
print(','.join([col.name for col in result_metadata_list]))
Copy

Gestion des erreurs

L’application doit traiter correctement les exceptions soulevées par le connecteur Snowflake, et décider de continuer ou d’arrêter l’exécution du code.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()
Copy

Utilisation de execute_stream pour exécuter des scripts SQL

La fonction execute_stream vous permet d’exécuter un ou plusieurs scripts SQL dans un flux :

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)
Copy

Fermeture de la connexion

Nous vous recommandons de fermer la connexion en utilisant la méthode close :

connection.close()
Copy

Ceci permet de s’assurer que les métriques client collectées sont soumises au serveur et que la session est supprimée. De plus, les blocs try-finally permettent de s’assurer que la connexion est fermée même si une exception est levée au milieu :

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()
Copy

Utilisation du gestionnaire de contexte pour connecter et contrôler les transactions

Le connecteur Snowflake pour Python prend en charge un gestionnaire de contexte attribuant et libérant des ressources au besoin. Le gestionnaire de contexte est utile pour valider ou annuler des transactions en fonction du statut de l’instruction lorsque autocommit est désactivé.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
Copy

Dans l’exemple ci-dessus, lorsque la troisième instruction échoue, le gestionnaire de contexte annule les modifications apportées à la transaction et ferme la connexion. Si toutes les instructions étaient réussies, le gestionnaire de contexte validerait les modifications et fermerait la connexion.

Un code équivalent avec les blocs try et except est le suivant :

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()
Copy

Connexion

Le connecteur Snowflake pour Python utilise le module standard Python logging pour enregistrer le statut à intervalles réguliers afin que l’application puisse suivre son activité en arrière-plan. La façon la plus simple d’activer la journalisation est d’ouvrir logging.basicConfig() au début de l’application.

Par exemple, pour définir le niveau de journalisation sur INFO et stocker les journaux dans un fichier nommé /tmp/snowflake_python_connector.log :

logging.basicConfig(
    filename=file_name,
    level=logging.INFO)
Copy

Une journalisation plus complète peut être activée en réglant le niveau de journalisation sur DEBUG comme suit :

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

La classe de formateur SecretDetector facultative mais recommandée garantit qu’un certain ensemble d’informations sensibles connues est masqué avant d’être écrit dans les fichiers journaux de Snowflake Python Connector. Pour utiliser SecretDetector, utilisez un code similaire au suivant :

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

Note

botocore et boto3 sont disponibles via AWS (Amazon Web Services) SDK pour Python.

Exemple de programme

L’exemple de code suivant combine plusieurs exemples décrits dans les chapitres précédents en un programme Python fonctionnel : Cet exemple contient deux parties :

  • Une classe parente (« python_veritas_base ») contient le code de nombreuses opérations courantes, telles que la connexion au serveur.

  • Une classe enfant (« python_connector_example ») représente les parties personnalisées d’un client particulier, par exemple, l’interrogation d’une table.

Cet exemple de code est importé directement à partir de l’un de nos tests pour garantir qu’il a été exécuté sur une version récente du produit.

Étant donné que cela provient d’un test, il inclut une petite quantité de code pour définir un autre port et un autre protocole utilisés dans certains tests. Les utilisateurs ne doivent pas définir le protocole ou le numéro de port ; au lieu de cela, omettez-les et utilisez les valeurs par défaut.

Il contient également des marqueurs de section (parfois appelés « balises d’extrait ») pour identifier le code qui peut être importé indépendamment dans la documentation. Les marqueurs de section ressemblent généralement à :

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
Copy

Ces marqueurs de section ne sont pas requis dans le code utilisateur.

La première partie de l’exemple de code contient les sous-routines courantes pour :

  • Lisez les arguments de ligne de commande (par exemple, « –warehouse MyWarehouse ») qui contiennent des informations de connexion.

  • Connectez-vous au serveur.

  • Créez et utilisez un entrepôt, une base de données et un schéma.

  • Détruisez le schéma, la base de données et l’entrepôt lorsque vous n’en avez plus besoin.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This gets account identifier and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account identifier and login information from environment variables and command-line parameters.
        # For information about account identifiers, see
        # https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account_identifier> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
            except:
                PORT = ""
            try:
                PROTOCOL = connection_parameters["protocol"]
            except:
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        # If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


Copy

La deuxième partie de l’exemple de code crée une table, y insère des lignes, etc. :


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

Copy

Pour exécuter cet exemple, procédez comme suit :

  1. Copiez le premier morceau de code dans un fichier nommé « python_veritas_base.py ».

  2. Copiez le deuxième morceau de code dans un fichier nommé « python_connector_example.py »

  3. Définissez la variable d’environnement SNOWSQL_PWD sur votre mot de passe, par exemple :

    export SNOWSQL_PWD='MyPassword'
    
    Copy
  4. Exécutez le programme à l’aide d’une ligne de commande similaire à la suivante (remplacez les informations d’utilisateur et de compte par vos propres informations d’utilisateur et de compte, bien sûr).

    Avertissement

    Cela supprime l’entrepôt, la base de données et le schéma à la fin du programme ! N’utilisez pas le nom d’une base de données existante, car vous la perdrez !

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
    
    Copy

Voici la sortie :

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...
Copy

Voici un exemple plus long :

Note

Dans le chapitre où vous configurez votre compte et vos informations de connexion, assurez-vous de remplacer les variables au besoin pour les faire correspondre à vos informations de connexion Snowflake (nom, mot de passe, etc.).

Cet exemple utilise la fonction format() pour composer l’instruction. Si votre environnement présente un risque d’attaques par injection SQL, vous pouvez, si vous le souhaitez, lier des valeurs plutôt que d’utiliser format().

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'

import os

# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # user error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()
Copy