Lecture de données à partir de sources de données externes à l’aide de Snowpark Python DB-API

Avec Snowpark Python DB-API, les utilisateurs de Snowpark Python peuvent extraire par programmation des données de bases de données externes dans Snowflake. Cela comprend :

  • Prise en charge Python DB-API : connexion à des bases de données externes à l’aide des pilotes 2.0 DB-API standard de Python.

  • Configuration rationalisée : utilisez pip pour installer les pilotes nécessaires, sans avoir à gérer de dépendances supplémentaires.

Avec ces APIs, vous pouvez facilement extraire des données dans des tables Snowflake et les transformer à l’aide de DataFramesSnowpark pour des analyses avancées.

Utilisation de Snowpark Python DB-API

La DB-API peut être utilisée de la même manière que l’API `Spark JDBC<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_. La plupart des paramètres sont conçus pour être identiques ou similaires pour une meilleure parité. En outre, Snowpark met l’accent sur une conception Python en premier lieu, avec des conventions de nommage intuitives, évitant ainsi les configurations JDBC spécifiques. Cela fournit aux développeurs Python une expérience familière. Pour plus d’informations sur la comparaison entre Snowpark Python DB-API et l’API Spark JDBC, voir Paramètres DB-API.

Paramètres DB-API

Paramètre

Snowpark DB-API

create_connection

Fonction pour créer une connexion Python DB-API.

table

Spécifie la table dans la base de données source.

query

Requête SQL englobée en tant que sous-requête pour la lecture des données.

column

Colonne de partitionnement pour les lectures parallèles.

lower_bound

Limite inférieure du partitionnement.

upper_bound

Limite supérieure du partitionnement.

num_partitions

Nombre de partitions pour le parallélisme.

query_timeout

Délai d’inactivité pour l’exécution SQL (en secondes).

fetch_size

Nombre de lignes extraites par aller-retour.

custom_schema

Schéma personnalisé pour l’extraction de données dans des bases de données externes.

max_workers

Nombre de travailleurs pour la récupération parallèle et l’extraction de données à partir de bases de données externes.

predicates

Liste des conditions pour les partitions de clause WHERE.

session_init_statement

Exécute une instruction SQL ou PL/SQL lors de l’initialisation de la session.

udtf_configs

Exécute la charge de travail à l’aide d’une UDTF Snowflake pour de meilleures performances.

fetch_merge_count

Nombre de lots extraits à fusionner dans un seul fichier Parquet avant le chargement.

Comprendre le parallélisme

Snowpark Python DB-API utilise deux formes indépendantes de parallélisme basées sur les entrées de l’utilisateur :

  • Parallélisme basé sur les partitions

    Lorsque les utilisateurs spécifient des informations de partitionnement (par exemple, column, lower_bound, upper_bound, num_partitions) ou des prédicats, Snowflake divise la requête en plusieurs partitions. Celles-ci sont traitées en parallèle à l’aide du multitraitement, chaque travailleur récupérant et écrivant sa propre partition.

  • Parallélisme basé sur la taille de la récupération dans chaque partition

    Dans une partition, l” API extrait les lignes dans des morceaux définis par fetch_size. Ces lignes sont écrites dans Snowflake en parallèle au fur et à mesure qu’elles sont extraites, ce qui permet à la lecture et à l’écriture de se chevaucher et d’optimiser le débit.

Ces deux formes de parallélisme fonctionnent indépendamment. Si aucun partitionnement ou fetch_size n’est spécifié, la fonction charge le tableau source entier en mémoire avant d’écrire dans Snowflake. Cela peut augmenter l’utilisation de la mémoire et réduire les performances pour les grands ensembles de données.

SQL Server

Utilisation de la DB-API pour se connecter à SQL Server à partir d’un client Snowpark

Pour se connecter à SQL Server de Snowpark, vous aurez besoin des trois paquets suivants :

Vous trouverez ci-dessous les exemples de code nécessaires pour vous connecter à SQL Server à partir du client Snowpark et d’une procédure stockée.

  • Installez le pilote Python SQL

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
  brew update
  HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Copy
  • Installez snowflake-snowpark-python[pandas] et pyodbc

pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Copy
  • Définissez la méthode d’usine pour créer une connexion à SQL Server

def create_sql_server_connection():
    import pyodbc
    HOST = "mssql_host"
    PORT = "mssql_port"
    USERNAME = "mssql_username"
    PASSWORD = "mssql_password"
    DATABASE = "mssql_db"
    connection_str = (
        "DRIVER={{ODBC Driver 18 for SQL Server}};"
        "SERVER={HOST},{PORT};"
        "DATABASE={DATABASE};"
        "UID={USERNAME};"
        "PWD={PASSWORD};"
    )
    connection = pyodbc.connect(connection_str)
    return connection


# Call dbapi to pull data from mssql_table

df = session.read.dbapi(
  create_sql_server_connection,
    table="mssql_table")
Copy

Utilisation de la DB-API pour se connecter à SQL Server à partir d’une procédure stockée

  • Configurez l’intégration d’accès externe, qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

    CREATE OR REPLACE SECRET mssql_secret
      TYPE = PASSWORD
      USERNAME = 'mssql_username'
      PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
      ALLOWED_NETWORK_RULES = (mssql_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
      ENABLED = true;
    
    -- Create or replace a Python stored procedure
    
    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
      RETURNS TABLE()
      LANGUAGE PYTHON
      RUNTIME_VERSION='3.11'
      HANDLER='run'
      PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
            EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
            SECRETS = ('cred' = mssql_secret )
    
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "mssql_host"
        port = mssql_port
        username = USER
        password = PASSWORD
        database = "mssql_database"
        connection_str = (
          "DRIVER={{ODBC Driver 18 for SQL Server}};"
          "SERVER={host},{port};"
          "DATABASE={database};"
          "UID={username};"
          "PWD={password};"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        df = session.read.dbapi(
            create_sql_server_connection,
            table="mssql_table"
        )
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Oracle

Pour vous connecter à Oracle depuis Snowpark, vous aurez besoin des deux paquets suivants :

Vous trouverez ci-dessous les exemples de code nécessaires pour vous connecter à Oracle à partir d’un client Snowpark, de procédures stockées et de Snowflake Notebooks.

Utilisation de la DB-API pour se connecter à Oracle à partir d’un client Snowpark

  • Installez snowflake-snowpark-python[pandas] et oracledb

    pip install snowflake-snowpark-python[pandas]
    pip install oradb
    
    Copy
  • Utilisez la DB-API pour extraire des données d’Oracle et définir la méthode d’usine pour créer une connexion à Oracle

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = "{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        return connection
    
    
    # Call dbapi to pull data from mytable
    
    df = session.read.dbapi(
      create_oracle_db_connection,
      table="mytable")
    
    Copy

Utilisation de la DB-API pour se connecter à Oracle à partir d’une procédure stockée

L’intégration d’accès externe est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

Note

PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

    -- Configure the secret, a network rule to allow egress to the source endpoint and external access integration.
    
    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy

Utilisation de Snowpark Python DB-API pour extraire des données d’Oracle dans une procédure stockée Python

CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'oracledb')
  EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
  SECRETS = ('cred' = ora_secret )
AS $$

# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to Oracle

from snowflake.snowpark import Session

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_ora_connection,
        table="ora_table"
    )
    return df
$$;

CALL sp_ora_dbapi();
Copy

Utilisation de la DB-API pour se connecter à Oracle depuis un Notebook Snowflake

  • Sélectionnez snowflake-snowpark-python et oracledb dans les paquets de Notebook.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy
  • Configurer l’accès externe pour Snowflake Notebooks puis redémarrez la session du Notebook.

Utilisation de Snowpark Python DB-API pour extraire des données d’Oracle dans une cellule Python de Snowflake Notebook

# Get user name and password from ora_secret

import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to Oracle

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

# Use dbapi to read data from ora_table

df_ora = session.read.dbapi(
  create_oracle_db_connection,
  table='ora_table'
)

# Save data into sf_table

df_ora.write.mode("overwrite").save_as_table('sf_table')
Copy

PostgreSQL

Pour vous connecter à PostgreSQL depuis Snowpark, vous aurez besoin des deux paquets suivants :

Vous trouverez ci-dessous les exemples de code nécessaires pour vous connecter à PostgreSQL à partir du client Snowpark, des procédures stockées et des Notebooks Snowflake.

Utilisation de la DB-API pour se connecter à PostgreSQL depuis un client Snowpark

  • Installer psycopg2

    pip install psycopg2
    
    Copy
  • Définir la méthode d’usine pour créer une connexion à PostgreSQL

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
        )
        return connection
    
    
    # Call dbapi to pull data from pg_table
    
    df = session.read.dbapi(
      create_pg_connection,
      table="pg_table")
    
    Copy

Utilisation de la DB-API pour se connecter à PostgreSQL à partir d’une procédure stockée

  • Configurez l’intégration d’accès externe, qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE pg_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pg_host:pg_port');

-- Configure an external access integration.

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
  ALLOWED_NETWORK_RULES = (pg_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
  ENABLED = true;
Copy
  • Utiliser Snowpark Python DB-API pour extraire des données depuis PostgreSQL dans une procédure stockée Python

CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$

# Get user name and password from pg_secret

import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to PostgreSQL

from snowflake.snowpark import Session

def create_pg_connection():
    import psycopg2
    connection = psycopg2.connect(
        host="pg_host",
        port=pg_port,
        dbname="pg_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    return df

$$;
CALL sp_pg_dbapi();
Copy

Utilisation de la DB-API pour se connecter à PostgreSQL depuis un Notebook Snowflake

  • Sélectionnez snowflake-snowpark-python et psycopg2 dans les paquets Snowflake Notebook.

  • Configurez l’intégration d’accès externe, qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

-- Configure the secret

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';
  ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);

 -- Configure the network rule to allow egress to the source endpoint

 CREATE OR REPLACE NETWORK RULE pg_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pg_host:pg_port');

 -- Configure external access integration

 CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
    ALLOWED_NETWORK_RULES = (pg_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
    ENABLED = true;
Copy
  • Configurer l’accès externe pour Snowflake Notebooks puis redémarrez la session du Notebook.

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
        )
        return connection
    
    # Use dbapi to read and save data from pg_table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

MySQL

Pour vous connecter à MySQL depuis Snowpark, vous aurez besoin des deux paquets suivants :

Vous trouverez ci-dessous les exemples de code nécessaires pour vous connecter à MySQL à partir du client Snowpark, des procédures stockées et du Notebook Snowflake.

Utilisation de la DB-API pour se connecter à MySQL depuis un client Snowpark

  • Installez pymysql

pip install snowflake-snowpark-python[pandas]
pip install pymysql
Copy
  • Définir la méthode d’usine pour créer une connexion à MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        database="mysql_db",
        user="mysql_user",
        password="mysql_password"
    )
    return connection


# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table"
)
Copy

Utilisation de la DB-API pour se connecter à MySQL à partir d’une procédure stockée

  • Configurez l’intégration d’accès externe, qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

-- Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • Utilisation de Snowpark Python DB-API pour extraire des données depuis MySQL dans une procédure stockée Python.

CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$

# Get user name and password from mysql_secret

import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password

# Define the factory method for creating a connection to MySQL

from snowflake.snowpark import session

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_mysql_connection,
        table="mysql_table"
    )
    return df
$$;

CALL sp_mysql_dbapi();
Copy

Utilisation de la DB-API pour se connecter à MySQL depuis un Notebook Snowflake

  • Sélectionnez snowflake-snowpark-python et pymysql dans les paquets d’un Notebook Snowflake.

  • Configurez l’intégration d’accès externe, qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret et ajoutez-le au Notebook Snowflake.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

  ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Copy
  • Configurez une règle réseau et une intégration d’accès externe.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table")

# Save data into sf_table

df.write.mode("overwrite").save_as_table('sf_table')
Copy

Databricks

Pour vous connecter à Databricks depuis Snowpark, vous aurez besoin des deux paquets suivants :

Vous trouverez ci-dessous les exemples de code nécessaires pour vous connecter à Databricks à partir du client Snowpark, des procédures stockées et du Notebook Snowflake.

Utilisation de la DB-API pour se connecter à Databricks à partir d’un client Snowpark

  • Installez databricks-sql-connector.

pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Copy
  • Définissez la méthode d’usine pour créer une connexion à Databricks

def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname=HOST,
        http_path=PATH,
        access_token=ACCESS_TOKEN
    )
    return connection


#Call dbapi to pull data from mytable

df = session.read.dbapi(
    create_dbx_connection,
    table="dbx_table")
Copy

Utilisation de la DB-API pour se connecter à Databricks à partir d’une procédure stockée

  • Sélectionnez snowflake-snowpark-python et pymysql dans les paquets Snowflake Notebook.

  • L’intégration d’accès externe est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source et l’intégration d’accès externe.

CREATE OR REPLACE SECRET dbx_secret
  TYPE = GENERIC_STRING
  SECRET_STRING = 'dbx_access_token';

-- Configure a network rule

CREATE OR REPLACE NETWORK RULE dbx_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('dbx_host:dbx_port');

--  Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
  ALLOWED_NETWORK_RULES = (dbx_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
  ENABLED = true;
Copy
  • Utilisation de Snowpark Python DB-API pour extraire des données de Databricks dans une procédure stockée Python

CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
  EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
  SECRETS = ('cred' = dbx_secret )
AS $$

# Get user name and password from dbx_secret

import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')

from snowflake.snowpark import Session

# define the method for creating a connection to Databricks
def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname="dbx_host",
        http_path="dbx_path",
        access_token=ACCESS_TOKEN,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    return df

$$;

CALL sp_dbx_dbapi();
Copy

Utilisation de la DB-API pour se connecter à Databricks à partir d’un Notebook Snowflake

  • Sélectionnez snowflake-snowpark-python et pymysql dans les paquets Snowflake Notebook.

  • Configurez l’intégration d’accès externe, qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.

    Note

    PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.

  • Configurez le secret et ajoutez-le au Notebook Snowflake.

    CREATE OR REPLACE SECRET dbx_secret
      TYPE = GENERIC_STRING
      SECRET_STRING = 'dbx_access_token';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    Copy
  • Configurer

    CREATE OR REPLACE NETWORK RULE dbx_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
      ALLOWED_NETWORK_RULES = (dbx_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
      ENABLED = true;
    
    Copy
  • Configurer l’accès externe pour Snowflake Notebooks puis redémarrez la session du Notebook.

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # use dbapi to read data from dbx_table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    
    # save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Limitations

Pilotes

Snowpark Python DB-API prend en charge uniquement les pilotes conformes 2.0 DB-API (par exemple, pyodbc, oracledb).Les pilotes JDBC ne sont pas pris en charge dans cette version.