Utilisation de DB-API Snowpark Python

With Snowpark Python DB-API, Snowpark Python users can programmatically pull data from external databases into Snowflake. It includes:

  • Prise en charge Python DB-API : connexion à des bases de données externes à l’aide des pilotes 2.0 DB-API standard de Python.

  • Configuration rationalisée : utilisez pip pour installer les pilotes nécessaires, sans avoir à gérer de dépendances supplémentaires.

With these APIs, you can seamlessly pull data into Snowflake tables and transform it using Snowpark DataFrames for advanced analytics.

La DB-API peut être utilisée de la même manière que l’API `Spark JDBC<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_. La plupart des paramètres sont conçus pour être identiques ou similaires pour une meilleure parité. En outre, Snowpark met l’accent sur une conception Python en premier lieu, avec des conventions de nommage intuitives, évitant ainsi les configurations JDBC spécifiques. Cela fournit aux développeurs Python une expérience familière. Pour plus d’informations comparant la DB-API Snowpark Python avec l’API JDBC Spark, consultez le tableau suivant :

Paramètres DB-API

Paramètre

DB-API Snowpark Python

create_connection

Fonction pour créer une connexion Python DB-API.

table

Spécifie la table dans la base de données source.

query

Requête SQL englobée en tant que sous-requête pour la lecture des données.

column

Colonne de partitionnement pour les lectures parallèles.

lower_bound

Limite inférieure du partitionnement.

upper_bound

Limite supérieure du partitionnement.

num_partitions

Nombre de partitions pour le parallélisme.

query_timeout

Délai d’inactivité pour l’exécution SQL (en secondes).

fetch_size

Nombre de lignes extraites par aller-retour.

custom_schema

Schéma personnalisé pour l’extraction de données dans des bases de données externes.

max_workers

Nombre de travailleurs pour la récupération parallèle et l’extraction de données à partir de bases de données externes.

predicates

Liste des conditions pour les partitions de clause WHERE.

session_init_statement

Exécute une instruction SQL ou PL/SQL lors de l’initialisation de la session.

udtf_configs

Exécute la charge de travail à l’aide d’une UDTF Snowflake pour de meilleures performances.

fetch_merge_count

Nombre de lots extraits à fusionner dans un seul fichier Parquet avant son importation.

Comprendre le parallélisme

Snowpark Python DB-API has two forms of ingestion mechanism underlying.

Local ingestion

In local ingestion, Snowpark first fetches data from external sources to your local environment where the dbapi() function is called and converts them to Parquet files. Next, Snowpark uploads these Parquet files to a temporary Snowflake stage and copies them into a temporary table from the stage.

UDTF ingestion

In UDTF ingestion, all workloads run on the Snowflake server. Snowpark first creates a UDTF and executes it, and the UDTF directly ingests data into Snowflake and stores it in a temporary table.

Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion.

Partition column

This method divides source data into a number of partitions based on four parameters when users call dbapi():

  • column

  • lower_bound

  • upper_bound

  • num_partitions

These four parameters have to be set at the same time and column must be numeric or date type.

Predicates

This method divides source data into partitions based on parameter predicates, which are a list of expressions suitable for inclusion in WHERE clauses, where each expression defines a partition. Predicates provide a more flexible way of dividing partitions; for example, you can divide partitions on boolean or non-numeric columns.

Snowpark Python DB-API also allows adjusting parallelism level within a partition.

Fetch_size

Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are fetched, allowing reading and writing to overlap and maximize throughput.

By combining the above methods of ingestion and parallelism, Snowflake has four ways of ingestion:

  • Local ingestion with partition column

    df_local_par_column = session.read.dbapi(
        create_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    Copy
  • Local ingestion with predicates

    df_local_predicates = session.read.dbapi(
        create_connection,
        table="target_table",
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy
  • UDTF ingestion with partition column

    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    df_udtf_par_column = session.read.dbapi(
        create_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    Copy
  • UDTF ingestion with predicates

    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    df_udtf_predicates = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

SQL server

Pour vous connecter à SQL Server de Snowpark, vous avez besoin des trois paquets suivants :

Les exemples de code suivants montrent comment se connecter à un serveur SQL d’un client Snowpark et d’une procédure stockée.

Utiliser la DB-API pour se connecter à SQL Server à partir d’un client Snowpark

  1. Installez le pilote SQL Python :

    /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
    brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
    brew update
    HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
    
    Copy
  2. Installez snowflake-snowpark-python[pandas] et pyodbc :

    pip install snowflake-snowpark-python[pandas]
    pip install pyodbc
    
    Copy
  3. Définissez la méthode d’usine pour créer une connexion à SQL Server :

    def create_sql_server_connection():
        import pyodbc
        SERVER = "<your host name>"
        PORT = <your port>
        UID = "<your user name>"
        PWD = "<your password>"
        DATABASE = "<your database name>"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={SERVER}:{PORT};"
            f"UID={UID};"
            f"PWD={PWD};"
            f"DATABASE={DATABASE};"
            "TrustServerCertificate=yes"
            "Encrypt=yes"
            # Optional to identify source of queries
            "APP=snowflake-snowpark-python;"
        )
        connection = pyodbc.connect(connection_str)
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_sql_server_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_sql_server_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

Using DB-API to connect to SQL Server from a stored procedure

  1. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  2. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    -- Configure a secret to allow egress to the source endpoint
    
    CREATE OR REPLACE SECRET mssql_secret
    TYPE = PASSWORD
    USERNAME = 'mssql_username'
    PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
    ALLOWED_NETWORK_RULES = (mssql_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
    ENABLED = true;
    
    Copy
  3. Use the DB-API to pull data from SQL Server in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
        EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
        SECRETS = ('cred' = mssql_secret )
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "<your host>"
        port = <your port>
        username = USER
        password = PASSWORD
        database = "<your database name>"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={host},{port};"
            f"DATABASE={database};"
            f"UID={username};"
            f"PWD={password};"
            "TrustServerCertificate=yes"
            "Encrypt=yes"
            # Optional to identify source of queries
            "APP=snowflake-snowpark-python;"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_sql_server_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_sql_server_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_sql_server_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_sql_server_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
    
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Using DB-API to connect to SQL server from a Snowflake notebook

  1. From Snowflake Notebook packages, select snowflake-snowpark-python and pyodbc.

  2. In the files tab on the left side, open the file environment.yml and add the following line of code after other entries under dependencies:

    - msodbcsql18
    
    Copy
  3. Configure the secret, a network rule to allow egress to the source endpoint, and external access integration:

    -- Configure a secret to allow egress to the source endpoint
    
    CREATE OR REPLACE SECRET mssql_secret
    TYPE = PASSWORD
    USERNAME = 'mssql_username'
    PASSWORD = 'mssql_password';
    
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
    ALLOWED_NETWORK_RULES = (mssql_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
    ENABLED = true;
    
    Copy
  4. Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.

  5. Use the DB-API to pull data from SQL Server in a Python cell of a Snowflake notebook:

    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    def create_sql_server_connection():
        import pyodbc
        SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"]
        UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"]
        PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"]
        DATABASE = "test_query_history"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={SERVER};"
            f"UID={UID};"
            f"PWD={PWD};"
            f"DATABASE={DATABASE};"
            "TrustServerCertificate=yes;"
            "Encrypt=yes;"
            # Optional to identify source of queries
            "APP=snowflake-snowpark-python;"
        )
        connection = pyodbc.connect(connection_str)
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_sql_server_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_sql_server_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to SQL server

  1. Include a tag of Snowpark in your create connection function:

    def create_sql_server_connection():
        import pyodbc
        SERVER = "<your host name>"
        PORT = <your port>
        UID = "<your user name>"
        PWD = "<your password>"
        DATABASE = "<your database name>"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={SERVER}:{PORT};"
            f"UID={UID};"
            f"PWD={PWD};"
            f"DATABASE={DATABASE};"
            "TrustServerCertificate=yes"
            "Encrypt=yes"
            # include this parameter for source tracing
            "APP=snowflake-snowpark-python;"
        )
        connection = pyodbc.connect(connection_str)
        return connection
    
    Copy
  2. Run the following SQL in your data source to capture queries from Snowpark that are still live:

    SELECT
        s.session_id,
        s.program_name,
        r.status,
        t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Oracle

Pour vous connecter à Oracle depuis Snowpark, vous avez besoin des deux paquets suivants :

Les exemples de code suivants montrent comment se connecter à Oracle à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.

Utiliser la DB-API pour se connecter à Oracle à partir d’un client Snowpark

  1. Installez snowflake-snowpark-python[pandas] et oracledb :

    pip install snowflake-snowpark-python[pandas]
    pip install oracledb
    
    Copy
  2. Utilisez la DB-API pour extraire des données d’Oracle et définir la méthode d’usine pour créer une connexion à Oracle :

    def create_oracle_db_connection():
        import oracledb
        HOST = "<your host>"
        PORT = <your port>
        SERVICE_NAME = "<your service name>"
        USER = "<your user name>"
        PASSWORD = "your password"
        DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        # Optional: include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_oracle_db_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

Using DB-API to connect to Oracle from a stored procedure

  1. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  2. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    -- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
    
    CREATE OR REPLACE SECRET ora_secret
    TYPE = PASSWORD
    USERNAME = 'ora_username'
    PASSWORD = 'ora_password';
    
    -- configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('ora_host:ora_port');
    
    -- configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
    ALLOWED_NETWORK_RULES = (ora_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
    ENABLED = true;
    
    Copy
  3. Use Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'oracledb')
        EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
        SECRETS = ('cred' = ora_secret )
    AS $$
    
    # Get user name and password from ora_secret
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define the factory method for creating a connection to Oracle
    
    from snowflake.snowpark import Session
    
    def create_oracle_db_connection():
        import oracledb
        host = "ora_host"
        port = "ora_port"
        service_name = "ora_service"
        user = USER
        password = PASSWORD
        DSN = f"{host}:{port}/{service_name}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        # Optional: include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_oracle_db_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_oracle_db_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_oracle_db_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_oracle_db_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    $$;
    
    CALL sp_ora_dbapi();
    
    Copy

Using DB-API to connect to Oracle from a Snowflake notebook

  1. Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et oracledb.

  2. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  3. Configure the secret, a network rule, and EAI to allow egress to the source endpoint:

    -- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
    CREATE OR REPLACE SECRET mysql_secret
        TYPE = PASSWORD
        USERNAME = 'mysql_username'
        PASSWORD = 'mysql_password';
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
    
    -- configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mysql_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('mysql_host:mysql_port');
    
    -- configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
        ALLOWED_NETWORK_RULES = (mysql_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
        ENABLED = true;
    
    Copy
  4. Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.

  5. Use the DB-API to pull data from Oracle in a Python cell of a Snowflake notebook:

    # Get user name and password from ora_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Oracle
    
    def create_oracle_db_connection():
        import oracledb
        host = "ora_host"
        port = "ora_port"
        service_name = "ora_service"
        user = USER
        password = PASSWORD
        DSN = f"{host}:{port}/{service_name}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN,
        )
        # Optional: include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_oracle_db_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df_ora.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to Oracle

  1. Include a tag of Snowpark in your create connection function.

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN,
        )
        # include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    Copy
  2. Run the following SQL in your data source to capture queries from Snowpark that are still live:

    SELECT
        s.sid,
        s.serial#,
        s.username,
        s.module,
        q.sql_id,
        q.sql_text,
        q.last_active_time
    FROM
        v$session s
        JOIN v$sql q ON s.sql_id = q.sql_id
    WHERE
        s.client_info = 'snowflake-snowpark-python'
    
    Copy

PostgreSQL

Pour vous connecter à PostgreSQL depuis Snowpark, vous avez besoin des deux paquets suivants :

Les exemples de code suivants montrent comment se connecter à PostgreSQL à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.

Utiliser la DB-API pour se connecter à PostgreSQL depuis un client Snowpark

  1. Installez psycopg2.

    pip install psycopg2
    
    Copy
  2. Définissez la méthode d’usine pour créer une connexion à PostgreSQL :

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
            # Optional: include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_pg_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

Using DB-API to connect to PostgreSQL from a stored procedure

  1. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  2. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    -- configure a secret
    
    CREATE OR REPLACE SECRET pg_secret
        TYPE = PASSWORD
        USERNAME = 'pg_username'
        PASSWORD = 'pg_password';
    
    -- configure a network rule.
    
    CREATE OR REPLACE NETWORK RULE pg_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('pg_host:pg_port');
    
    -- configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
        ALLOWED_NETWORK_RULES = (pg_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
        ENABLED = true;
    
    Copy
  3. Use Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'psycopg2')
        EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
        SECRETS = ('cred' = pg_secret )
    AS $$
    
    # Get user name and password from pg_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define the factory method for creating a connection to PostgreSQL
    
    from snowflake.snowpark import Session
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    def run(session: Session):
    
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_pg_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_pg_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_pg_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_pg_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    
    $$;
    CALL sp_pg_dbapi();
    
    Copy

Using DB-API to connect to PostgreSQL from a Snowflake notebook

  1. Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et psycopg2.

  2. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  3. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    -- Configure the secret
    
    CREATE OR REPLACE SECRET pg_secret
        TYPE = PASSWORD
        USERNAME = 'pg_username'
        PASSWORD = 'pg_password';
    
    ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
    
    -- Configure the network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE pg_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('pg_host:pg_port');
    
    -- Configure external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
        ALLOWED_NETWORK_RULES = (pg_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
        ENABLED = true;
    
    Copy
  4. Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.

  5. Use the DB-API to pull data from PostgreSQL in a Python cell of a Snowflake notebook:

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_pg_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    # Get the user name and password from :code:`pg_secret`
    
    Copy

Source tracing when using DB-API to connect to PostgreSQL

  1. Include a tag of Snowpark in your create connection function.

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
            # Include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    Copy
  2. Run the following SQL in your data source to capture queries from Snowpark that are still live:

    SELECT
        pid,
        usename AS username,
        datname AS database,
        application_name,
        client_addr,
        state,
        query_start,
        query
    FROM
        pg_stat_activity
    WHERE
        application_name = 'snowflake-snowpark-python';
    
    Copy

MySQL

Pour vous connecter à MySQL depuis Snowpark, vous avez besoin des deux paquets suivants :

Les exemples de code suivants montrent comment se connecter à MySQL à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.

Utiliser la DB-API pour se connecter à MySQL depuis un client Snowpark

  1. Installez pymysql :

    pip install snowflake-snowpark-python[pandas]
    pip install pymysql
    
    Copy
  2. Définissez la méthode d’usine pour créer une connexion à MySQL :

    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            database="mysql_db",
            user="mysql_user",
            password="mysql_password",
            # Optional: include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_mysql_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_mysql_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

Using DB-API to connect to MySQL from a stored procedure

  1. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  2. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    CREATE OR REPLACE SECRET mysql_secret
        TYPE = PASSWORD
        USERNAME = 'mysql_username'
        PASSWORD = 'mysql_password';
    
    -- configure a network rule.
    
    CREATE OR REPLACE NETWORK RULE mysql_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('mysql_host:mysql_port');
    
    -- configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
        ALLOWED_NETWORK_RULES = (mysql_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
            ENABLED = true;
    
    Copy
  3. Utilisez la DB-API Snowpark Python pour extraire des données depuis MySQL dans une procédure stockée Python :

    CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'pymysql')
        EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
        SECRETS = ('cred' = mysql_secret )
    AS $$
    
    # Get user name and password from mysql_secret
    
    import _snowflake
        username_password_object = _snowflake.get_username_password('cred')
        USER = username_password_object.username
        PASSWORD = username_password_object.password
    
    # Define the factory method for creating a connection to MySQL
    
    from snowflake.snowpark import session
    
    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            dbname="mysql_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    # Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure.
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_mysql_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_mysql_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_mysql_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_mysql_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    $$;
    
    CALL sp_mysql_dbapi();
    
    Copy

Using DB-API to connect to MySQL from a Snowflake notebook

  1. Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et pymysql.

  2. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  3. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    CREATE OR REPLACE SECRET mysql_secret
        TYPE = PASSWORD
        USERNAME = 'mysql_username'
        PASSWORD = 'mysql_password';
    
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
    
    -- configure a network rule.
    CREATE OR REPLACE NETWORK RULE mysql_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('mysql_host:mysql_port');
    
    -- configure an EAI
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
        ALLOWED_NETWORK_RULES = (mysql_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
        ENABLED = true;
    
    Copy
  4. Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.

  5. Use the DB-API to pull data from MySQL in a Python cell of a Snowflake notebook:

    # Get user name and password from mysql_secret
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to MySQL
    
    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            dbname="mysql_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_mysql_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_mysql_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to MySQL

  1. Include a tag of Snowpark in your create connection function.

    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            database="mysql_db",
            user="mysql_user",
            password="mysql_password",
            # include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    Copy
  2. Run the following SQL in your data source to capture queries from Snowpark:

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
        SELECT THREAD_ID
        FROM performance_schema.events_statements_history_long
        WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'"
        ORDER BY EVENT_ID DESC
        LIMIT 1
    )
    
    Copy

Databricks

Pour vous connecter à Databricks depuis Snowpark, vous avez besoin des deux paquets suivants :

The following code examples show how to connect to Databricks from a Snowpark client, stored procedures, and a Snowflake notebook.

Using DB-API to connect to Databricks from a Snowpark client

  1. Install databricks-sql-connector:

    pip install snowflake-snowpark-python[pandas]
    pip install databricks-sql-connector
    
    Copy
  2. Définissez la méthode d’usine pour créer une connexion à Databricks :

    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname=HOST,
            http_path=PATH,
            access_token=ACCESS_TOKEN
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_dbx_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

Using DB-API to connect to Databricks from a stored procedure

  1. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  2. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    CREATE OR REPLACE SECRET dbx_secret
        TYPE = GENERIC_STRING
        SECRET_STRING = 'dbx_access_token';
    
    CREATE OR REPLACE NETWORK RULE dbx_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
        ALLOWED_NETWORK_RULES = (dbx_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
        ENABLED = true;
    
    Copy
  3. Utilisez la DB-API Snowpark Python pour extraire des données de Databricks dans une procédure stockée Python :

    CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
        EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
        SECRETS = ('cred' = dbx_secret )
    AS $$
    
    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    from snowflake.snowpark import Session
    
    # Define the method for creating a connection to Databricks
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure.
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_dbx_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_dbx_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_dbx_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_dbx_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    
    $$;
    
    CALL sp_dbx_dbapi();
    
    Copy

Using DB-API to connect to Databricks from a Snowflake notebook

  1. From Snowflake Notebook packages, select snowflake-snowpark-python and databricks-sql-connector.

  2. Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  3. Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :

    CREATE OR REPLACE SECRET dbx_secret
    TYPE = GENERIC_STRING
    SECRET_STRING = 'dbx_access_token';
    
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    CREATE OR REPLACE NETWORK RULE dbx_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
    ALLOWED_NETWORK_RULES = (dbx_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
    ENABLED = true;
    
    Copy
  4. Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.

  5. Use the DB-API to pull data from Databricks in a Python cell of a Snowflake notebook:

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_dbx_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to Databricks

  1. Include a tag of Snowpark in your create connection function.

    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname=HOST,
            http_path=PATH,
            access_token=ACCESS_TOKEN,
            # include this parameter for source tracing
            user_agent_entry="snowflake-snowpark-python"
        )
        return connection
    
    Copy
  2. Navigate to query history on the DataBricks console and search for the query whose source is snowflake-snowpark-python.

Limitations

La DB-API snowpark Python prend en charge uniquement les pilotes conformes 2.0 DB-API (par exemple, pyodbc ou oracledb). Les pilotes JDBC ne sont pas pris en charge dans cette version.