Utilisation de l’API JDBC de Snowpark Python¶
Avec l’API JDBC de Snowpark Python, les utilisateurs de Snowpark Python peuvent extraire par programmation des données de bases de données externes dans Snowflake. Cela vous permet de vous connecter à des bases de données externes à l’aide de pilotes JDBC.
Avec ces APIs, vous pouvez facilement extraire des données dans des tables Snowflake et les transformer à l’aide de DataFramesSnowpark pour des analyses avancées.
L’API `JDBC-<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc> `_peut être utilisée de la même manière que l’API JDBC de Spark. La plupart des paramètres sont conçus pour être identiques ou similaires pour une meilleure parité. Pour plus d’informations comparant l’API JDBC de Snowpark Python avec l’API JDBC de Spark, consultez le tableau suivant :
Paramètres JDBC de Snowpark¶
Paramètre |
JDBC Snowpark Python |
|---|---|
|
Chaîne de connexion utilisée pour se connecter à la source de données externe via le pilote JDBC |
|
Un dictionnaire contenant les configurations nécessaires pour la création de l’UDTF |
|
Un dictionnaire contenant la paire clé-valeur nécessaire lors de l’établissement de la connexion de JDBC |
|
Table dans la base de données source |
|
Requête SQL englobée en tant que sous-requête pour la lecture des données. |
|
Colonne de partitionnement pour les lectures parallèles. |
|
Limite inférieure du partitionnement. |
|
Limite supérieure du partitionnement. |
|
Nombre de partitions pour le parallélisme. |
|
La durée d’expiration pour SQL exécution, mesurée en secondes. |
|
Nombre de lignes extraites par aller-retour. |
|
Schéma personnalisé pour l’extraction de données dans des bases de données externes. |
|
Liste des conditions pour les partitions de clause WHERE. |
|
Exécute une instruction SQL ou PL/SQL lors de l’initialisation de la session. |
Comprendre le parallélisme¶
L’API JDBC de Snowpark Python dispose actuellement d’une forme de mécanisme d’ingestion sous-jacent :
- Ingestion UDTF
Toutes les charges de travail s’exécutent sur le serveur Snowflake. Snowpark crée une UDTF Java et l’invoque en parallèle pour ingérer des données dans une table temporaire Snowflake. Ainsi, le paramètre
udtf_configsest nécessaire pour cette fonction.
L’API JDBC de Snowpark Python dispose également de deux moyens de paralléliser et d’accélérer l’ingestion.
- Colonne de partition
Cette méthode divise les données sources en un certain nombre de partitions basées sur quatre paramètres lorsque les utilisateurs appellent
jdbc():columnlower_boundupper_boundnum_partitions
Ces quatre paramètres doivent être activés en même temps et
columndoit être un type numérique ou de date.- Prédicats
Cette méthode divise les données sources en partitions basées sur des prédicats de paramètres, qui sont une liste d’expressions pouvant être incluses dans des clauses
WHERE, où chaque expression définit une partition. Les prédicats fournissent un moyen plus flexible de diviser des partitions ; par exemple, vous pouvez diviser des partitions sur des colonnes booléennes ou non numériques.
L’API JDBC de Snowpark Python permet également d’ajuster le niveau de parallélisme au sein d’une partition.
- Fetch_size
Dans une partition, l” API extrait les lignes dans des morceaux définis par
fetch_size. Ces lignes sont écrites dans Snowflake en parallèle au fur et à mesure qu’elles sont extraites, ce qui permet à la lecture et à l’écriture d’avoir lieu en même temps et d’optimiser le débit.
Utiliser l’API JDBC pour ingérer des données à partir d’une source de données externe¶
Utiliser l’API JDBC pour ingérer les données d’un client Snowpark¶
Charger le fichier jar du pilote JDBC vers une zone de préparation Snowflake utilisant Snowpark ou Snowsight
Téléchargez en utilisant Snowpark.
Dans Snowpark, après avoir créé une session, exécutez le code suivant :
session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
Chargez en utilisant Snowsight comme décrit dans les étapes suivantes.
Dans Snowsight, cliquez sur Catalog -> Database Explorer.
Dans la barre de recherche gauche des bases de données, cliquez sur [your database name] -> [your schema name] -> stages -> [your stage name].
Cliquez sur le bouton “+File” dans le coin supérieur droit de la page de la zone de préparation.
Configurez le secret, la règle réseau et l’intégration d’accès externe.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Extraire les données de la cible à l’aide de l’API JDBC de Snowpark à partir d’un client Snowpark.
connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
Utiliser l’API JDBC pour ingérer des données à partir d’une procédure stockée¶
Chargez le fichier jar du pilote JDBC dans la zone de préparation Snowflake en utilisant Snowsight
Dans Snowsight, cliquez sur Catalog -> Database Explorer
Dans la barre de recherche gauche des bases de données, cliquez sur [your database name] -> [your schema name] -> stages -> [your stage name].
Cliquez sur le bouton “+File” dans le coin supérieur droit de la page de la zone de préparation.
Configurez le secret, la règle réseau et l’intégration d’accès externe.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Extraire les données d’une cible à l’aide de l’API JDBC de Snowpark à partir d’une procédure stockée.
CREATE OR REPLACE PROCEDURE sp_jdbc() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.10' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'run' AS $$ import time def run(session): connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] ) df_table.write.save_as_table("snowflake_table", mode="overwrite") return f"success" $$ ; call sp_jdbc(); select * from snowflake_table ;
Utiliser l’API JDBC pour ingérer des données à partir d’un notebook Snowflake¶
Chargez le fichier jar du pilote JDBC dans la zone de préparation Snowflake en utilisant Snowsight
Dans Snowsight, cliquez sur Catalog -> Database Explorer
Dans la barre de recherche gauche des bases de données, cliquez sur [your database name] -> [your schema name] -> stages -> [your stage name].
Cliquez sur le bouton “+File” dans le coin supérieur droit de la page de la zone de préparation.
Configurez le secret, la règle réseau et l’intégration d’accès externe.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Extraire les données d’une cible à l’aide de l’API JDBC Snowpark à partir d’un notebook Snowflake.
import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
Traçage source¶
Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter à MySQL¶
Incluez une balise de Snowpark dans votre fonction de création de connexion :
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%'; )
Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter au serveur SQL¶
Incluez une balise de Snowpark dans votre fonction de création de connexion :
connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter à PostgresSQL¶
Incluez une balise de Snowpark dans votre fonction de création de connexion :
connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter à Oracle¶
Incluez une balise de Snowpark dans votre fonction de création de connexion :
connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT sid, serial#, username, program, module, action, client_identifier, client_info, osuser, machine FROM v$session WHERE program = 'snowflake-snowpark-python';
DBMS commun et prise en charge des types¶
Ce qui suit est une liste certifiée de types de données de différents systèmes DBMS. Si vos données sources impliquent d’autres types de données, l’API JDBC de Snowpark Python tentera de les mapper aux types de données Snowflake au mieux, ou reviendra aux chaînes.
Oracle¶
INTEGER
NUMBER
BINARY_FLOAT
BINARY_DOUBLE
VARCHAR2
CHAR
CLOB
NCHAR
NVARCHAR2
NCLOB
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
RAW
PostgresSQL¶
BIGINT
BIGSERIAL
BIT
BIT VARYING
BOOLEAN
BOX
BYTEA
CHAR
VARCHAR
CIDR
CIRCLE
DATE
DOUBLE PRECISION
INET
INTEGER
INTERVAL
JSON
JSONB
LINE
LSEG
MACADDR
POINT
POLYGON
REAL
SMALLINT
SMALLSERIAL
SERIAL
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TSQUERY
TSVECTOR
TXID_SNAPSHOT
UUID
XML
MySQL¶
INT
DECIMAL
INT
TINYINT
SMALLINT
MEDIUMINT
BIGINT
YEAR
FLOAT
DOUBLE
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
DATE
DATETIME
TIMESTAMP
TIME
JSON
SQL Server¶
INT
BIGINT
INT
SMALLINT
TINYINT
BIT
DECIMAL
NUMERIC
MONEY
SMALLMONEY
FLOAT
REAL
DATE
TIME
DATETIME
DATETIME2
SMALLDATETIME
CHAR
VARCHAR
VARCHAR(MAX)
TEXT
NCHAR
NVARCHAR
NVARCHAR(MAX)
NTEXT
BINARY
VARBINARY
VARBINARY(MAX)
IMAGE
UNIQUEIDENTIFIER
TIMESTAMP
Databricks¶
La connexion à Databricks à l’aide de l’API JDBC de Snowpark Python n’est actuellement pas pris en charge.