Utilisation de l’API JDBC de Snowpark Python

Avec l’API JDBC de Snowpark Python, les utilisateurs de Snowpark Python peuvent extraire par programmation des données de bases de données externes dans Snowflake. Cela vous permet de vous connecter à des bases de données externes à l’aide de pilotes JDBC.

Avec ces APIs, vous pouvez facilement extraire des données dans des tables Snowflake et les transformer à l’aide de DataFramesSnowpark pour des analyses avancées.

L’API `JDBC-<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc> `_peut être utilisée de la même manière que l’API JDBC de Spark. La plupart des paramètres sont conçus pour être identiques ou similaires pour une meilleure parité. Pour plus d’informations comparant l’API JDBC de Snowpark Python avec l’API JDBC de Spark, consultez le tableau suivant :

Paramètres JDBC de Snowpark

Paramètre

JDBC Snowpark Python

url

Chaîne de connexion utilisée pour se connecter à la source de données externe via le pilote JDBC

udtf_configs

Un dictionnaire contenant les configurations nécessaires pour la création de l’UDTF

properties

Un dictionnaire contenant la paire clé-valeur nécessaire lors de l’établissement de la connexion de JDBC

table

Table dans la base de données source

query

Requête SQL englobée en tant que sous-requête pour la lecture des données.

column

Colonne de partitionnement pour les lectures parallèles.

lower_bound

Limite inférieure du partitionnement.

upper_bound

Limite supérieure du partitionnement.

num_partitions

Nombre de partitions pour le parallélisme.

query_timeout

La durée d’expiration pour SQL exécution, mesurée en secondes.

fetch_size

Nombre de lignes extraites par aller-retour.

custom_schema

Schéma personnalisé pour l’extraction de données dans des bases de données externes.

predicates

Liste des conditions pour les partitions de clause WHERE.

session_init_statement

Exécute une instruction SQL ou PL/SQL lors de l’initialisation de la session.

Comprendre le parallélisme

L’API JDBC de Snowpark Python dispose actuellement d’une forme de mécanisme d’ingestion sous-jacent :

Ingestion UDTF

Toutes les charges de travail s’exécutent sur le serveur Snowflake. Snowpark crée une UDTF Java et l’invoque en parallèle pour ingérer des données dans une table temporaire Snowflake. Ainsi, le paramètre udtf_configs est nécessaire pour cette fonction.

L’API JDBC de Snowpark Python dispose également de deux moyens de paralléliser et d’accélérer l’ingestion.

Colonne de partition

Cette méthode divise les données sources en un certain nombre de partitions basées sur quatre paramètres lorsque les utilisateurs appellent jdbc() :

  • column

  • lower_bound

  • upper_bound

  • num_partitions

Ces quatre paramètres doivent être activés en même temps et column doit être un type numérique ou de date.

Prédicats

Cette méthode divise les données sources en partitions basées sur des prédicats de paramètres, qui sont une liste d’expressions pouvant être incluses dans des clauses WHERE, où chaque expression définit une partition. Les prédicats fournissent un moyen plus flexible de diviser des partitions ; par exemple, vous pouvez diviser des partitions sur des colonnes booléennes ou non numériques.

L’API JDBC de Snowpark Python permet également d’ajuster le niveau de parallélisme au sein d’une partition.

Fetch_size

Dans une partition, l” API extrait les lignes dans des morceaux définis par fetch_size. Ces lignes sont écrites dans Snowflake en parallèle au fur et à mesure qu’elles sont extraites, ce qui permet à la lecture et à l’écriture d’avoir lieu en même temps et d’optimiser le débit.

Utiliser l’API JDBC pour ingérer des données à partir d’une source de données externe

Utiliser l’API JDBC pour ingérer les données d’un client Snowpark

  1. Charger le fichier jar du pilote JDBC vers une zone de préparation Snowflake utilisant Snowpark ou Snowsight

    • Téléchargez en utilisant Snowpark.

      Dans Snowpark, après avoir créé une session, exécutez le code suivant :

      session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
      
      Copy
    • Chargez en utilisant Snowsight comme décrit dans les étapes suivantes.

      1. Dans Snowsight, cliquez sur Catalog -> Database Explorer.

      2. Dans la barre de recherche gauche des bases de données, cliquez sur [your database name] -> [your schema name] -> stages -> [your stage name].

      3. Cliquez sur le bouton “+File” dans le coin supérieur droit de la page de la zone de préparation.

  2. Configurez le secret, la règle réseau et l’intégration d’accès externe.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Extraire les données de la cible à l’aide de l’API JDBC de Snowpark à partir d’un client Snowpark.

    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

Utiliser l’API JDBC pour ingérer des données à partir d’une procédure stockée

  1. Chargez le fichier jar du pilote JDBC dans la zone de préparation Snowflake en utilisant Snowsight

    • Dans Snowsight, cliquez sur Catalog -> Database Explorer

    • Dans la barre de recherche gauche des bases de données, cliquez sur [your database name] -> [your schema name] -> stages -> [your stage name].

    • Cliquez sur le bouton “+File” dans le coin supérieur droit de la page de la zone de préparation.

  2. Configurez le secret, la règle réseau et l’intégration d’accès externe.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Extraire les données d’une cible à l’aide de l’API JDBC de Snowpark à partir d’une procédure stockée.

    CREATE OR REPLACE PROCEDURE sp_jdbc()
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python')
    HANDLER = 'run'
    AS
    $$
    import time
    def run(session):
        connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
        udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
        # Call jdbc to pull data from target table
        df_table = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
            )
    
        # Call jdbc to pull data from target query
        df_query = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                query="select * from <your table>",
            )
    
        # Pull data from target table with parallelism using partition column
        df_table_partition_column = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                num_partitions=4,
                column="ID",
                upper_bound=10000,
                lower_bound=0
            )
    
        # Pull data from target table with parallelism using predicates
        df_table_predicates = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                predicates = [
                    "ID < 3",
                    "ID >= 3"
                ]
            )
        df_table.write.save_as_table("snowflake_table", mode="overwrite")
        return f"success"
    
    $$
    ;
    
    call sp_jdbc();
    select * from snowflake_table ;
    
    Copy

Utiliser l’API JDBC pour ingérer des données à partir d’un notebook Snowflake

  1. Chargez le fichier jar du pilote JDBC dans la zone de préparation Snowflake en utilisant Snowsight

    • Dans Snowsight, cliquez sur Catalog -> Database Explorer

    • Dans la barre de recherche gauche des bases de données, cliquez sur [your database name] -> [your schema name] -> stages -> [your stage name].

    • Cliquez sur le bouton “+File” dans le coin supérieur droit de la page de la zone de préparation.

  2. Configurez le secret, la règle réseau et l’intégration d’accès externe.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Extraire les données d’une cible à l’aide de l’API JDBC Snowpark à partir d’un notebook Snowflake.

    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

Traçage source

Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter à MySQL

  1. Incluez une balise de Snowpark dans votre fonction de création de connexion :

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
      SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%';
    )
    
    Copy

Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter au serveur SQL

  1. Incluez une balise de Snowpark dans votre fonction de création de connexion :

    connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
          url=connection_str,
          udtf_configs=udtf_configs,
          table="<your table>",
      )
    
    Copy
  2. Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :

    SELECT
      s.session_id,
      s.program_name,
      r.status,
      t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter à PostgresSQL

  1. Incluez une balise de Snowpark dans votre fonction de création de connexion :

    connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :

    SELECT
      pid,
      usename AS username,
      datname AS database,
      application_name,
      client_addr,
      state,
      query_start,
      query
    FROM
      pg_stat_activity
    WHERE
      application_name = 'snowflake-snowpark-python';
    
    Copy

Traçage source lors de l’utilisation de l’API JDBC de Snowpark pour se connecter à Oracle

  1. Incluez une balise de Snowpark dans votre fonction de création de connexion :

    connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :

    SELECT
      sid,
      serial#,
      username,
      program,
      module,
      action,
      client_identifier,
      client_info,
      osuser,
      machine
    FROM v$session
    WHERE program = 'snowflake-snowpark-python';
    
    Copy

DBMS commun et prise en charge des types

Ce qui suit est une liste certifiée de types de données de différents systèmes DBMS. Si vos données sources impliquent d’autres types de données, l’API JDBC de Snowpark Python tentera de les mapper aux types de données Snowflake au mieux, ou reviendra aux chaînes.

Oracle

  • INTEGER

  • NUMBER

  • BINARY_FLOAT

  • BINARY_DOUBLE

  • VARCHAR2

  • CHAR

  • CLOB

  • NCHAR

  • NVARCHAR2

  • NCLOB

  • DATE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • RAW

PostgresSQL

  • BIGINT

  • BIGSERIAL

  • BIT

  • BIT VARYING

  • BOOLEAN

  • BOX

  • BYTEA

  • CHAR

  • VARCHAR

  • CIDR

  • CIRCLE

  • DATE

  • DOUBLE PRECISION

  • INET

  • INTEGER

  • INTERVAL

  • JSON

  • JSONB

  • LINE

  • LSEG

  • MACADDR

  • POINT

  • POLYGON

  • REAL

  • SMALLINT

  • SMALLSERIAL

  • SERIAL

  • TEXT

  • TIME

  • TIMESTAMP

  • TIMESTAMPTZ

  • TSQUERY

  • TSVECTOR

  • TXID_SNAPSHOT

  • UUID

  • XML

MySQL

  • INT

  • DECIMAL

  • INT

  • TINYINT

  • SMALLINT

  • MEDIUMINT

  • BIGINT

  • YEAR

  • FLOAT

  • DOUBLE

  • CHAR

  • VARCHAR

  • TINYTEXT

  • TEXT

  • MEDIUMTEXT

  • LONGTEXT

  • ENUM

  • SET

  • BIT

  • BINARY

  • VARBINARY

  • TINYBLOB

  • BLOB

  • MEDIUMBLOB

  • LONGBLOB

  • DATE

  • DATETIME

  • TIMESTAMP

  • TIME

  • JSON

SQL Server

  • INT

  • BIGINT

  • INT

  • SMALLINT

  • TINYINT

  • BIT

  • DECIMAL

  • NUMERIC

  • MONEY

  • SMALLMONEY

  • FLOAT

  • REAL

  • DATE

  • TIME

  • DATETIME

  • DATETIME2

  • SMALLDATETIME

  • CHAR

  • VARCHAR

  • VARCHAR(MAX)

  • TEXT

  • NCHAR

  • NVARCHAR

  • NVARCHAR(MAX)

  • NTEXT

  • BINARY

  • VARBINARY

  • VARBINARY(MAX)

  • IMAGE

  • UNIQUEIDENTIFIER

  • TIMESTAMP

Databricks

La connexion à Databricks à l’aide de l’API JDBC de Snowpark Python n’est actuellement pas pris en charge.