Utilisation de DB-API Snowpark Python
With the Snowpark Python DB-API, Snowpark Python users can programmatically pull data from external databases into Snowflake. The DB-API includes:
Prise en charge Python DB-API : connexion à des bases de données externes à l’aide des pilotes 2.0 DB-API standard de Python.
Configuration rationalisée : utilisez pip pour installer les pilotes nécessaires, sans avoir à gérer de dépendances supplémentaires.
Avec ces APIs, vous pouvez facilement extraire des données dans des tables Snowflake et les transformer à l’aide de DataFramesSnowpark pour des analyses avancées.
La DB-API peut être utilisée de la même manière que l’API `Spark JDBC<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_. La plupart des paramètres sont conçus pour être identiques ou similaires pour une meilleure parité. En outre, Snowpark met l’accent sur une conception Python en premier lieu, avec des conventions de nommage intuitives, évitant ainsi les configurations JDBC spécifiques. Cela fournit aux développeurs Python une expérience familière. Pour plus d’informations comparant la DB-API Snowpark Python avec l’API JDBC Spark, consultez le tableau suivant :
Comprendre le parallélisme
The Snowpark Python DB-API has two underlying forms of ingestion mechanisms:
- Ingestion locale
In local ingestion, Snowpark first fetches data from external sources to your local environment, where the dbapi() function is called and
converts them to Parquet files. Next, Snowpark uploads these Parquet files to a temporary Snowflake stage and copies them into a temporary
table from the stage.
- Ingestion UDTF
Lors de l’ingestion UDTF, toutes les charges de travail s’exécutent sur le serveur Snowflake. Snowpark crée d’abord une UDTF et l’exécute, et l’UDTF ingère directement les données dans Snowflake et les stocke dans une table temporaire.
The Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion:
- Colonne de partition
This method divides source data into multiple partitions based on four parameters when users call dbapi():
column
lower_bound
upper_bound
num_partitions
Ces quatre paramètres doivent être réglés en même temps et column doit être un type numérique ou de date.
- Prédicats
This method divides source data into partitions based on parameter predicates, which are a list of expressions suitable for inclusion
in WHERE clauses, where each expression defines a partition. Predicates provide a more flexible way of dividing partitions; for example,
you can divide partitions on Boolean or non-numeric columns.
The Snowpark Python DB-API also allows the adjustment of parallelism level within a partition:
- Fetch_size
Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are
fetched, which allows reading and writing to overlap and maximizes throughput.
By combining the listed methods of ingestion and parallelism, Snowflake has four ways of ingestion:
Ingestion locale avec colonne de partition
df_local_par_column = session.read.dbapi(
create_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
Ingestion locale avec prédicats
df_local_predicates = session.read.dbapi(
create_connection,
table="target_table",
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Ingestion UDTF avec colonne de partition
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
df_udtf_par_column = session.read.dbapi(
create_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
Ingestion UDTF avec prédicats
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
SQL Server
Pour vous connecter à SQL Server de Snowpark, vous avez besoin des trois paquets suivants :
Les exemples de code suivants montrent comment se connecter à un serveur SQL d’un client Snowpark et d’une procédure stockée.
Utiliser la DB-API pour se connecter à SQL Server à partir d’un client Snowpark
Installez le pilote SQL Python :
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
brew update
HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql mssql-tools
Installez snowflake-snowpark-python[pandas] et pyodbc :
pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Définissez la méthode d’usine pour créer une connexion à SQL Server :
def create_sql_server_connection():
import pyodbc
SERVER = "<your host name>"
PORT = <your port>
UID = "<your user name>"
PWD = "<your password>"
DATABASE = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={SERVER}:{PORT};"
f"UID={UID};"
f"PWD={PWD};"
f"DATABASE={DATABASE};"
"TrustServerCertificate=yes"
"Encrypt=yes"
# Optional to identify source of queries
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to SQL Server from a stored procedure
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
-- Configure a secret to allow egress to the source endpoint
CREATE OR REPLACE SECRET mssql_secret
TYPE = PASSWORD
USERNAME = 'mssql_username'
PASSWORD = 'mssql_password';
-- Configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE mssql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mssql_host:mssql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
ALLOWED_NETWORK_RULES = (mssql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
ENABLED = true;
Utilisez la DB-API pour extraire des données du serveur SQL dans une procédure stockée Python :
CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
SECRETS = ('cred' = mssql_secret )
AS $$
# Get user name and password from mssql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define a method to connect to SQL Server_hostname
from snowflake.snowpark import Session
def create_sql_server_connection():
import pyodbc
host = "<your host>"
port = <your port>
username = USER
password = PASSWORD
database = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={host},{port};"
f"DATABASE={database};"
f"UID={username};"
f"PWD={password};"
"TrustServerCertificate=yes"
"Encrypt=yes"
# Optional to identify source of queries
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_mssql_dbapi();
Use the DB-API to connect to SQL Server from a Snowflake notebook
Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et pyodbc.
In the Files pane, open the file environment.yml, and under Dependencies, add the following line of code after other entries:
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
-- Configure a secret to allow egress to the source endpoint
CREATE OR REPLACE SECRET mssql_secret
TYPE = PASSWORD
USERNAME = 'mssql_username'
PASSWORD = 'mssql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret);
-- Configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE mssql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mssql_host:mssql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
ALLOWED_NETWORK_RULES = (mssql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
ENABLED = true;
Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.
Utilisez la DB-API pour extraire des données du serveur SQL dans une cellule Python d’un notebook Snowflake :
# Get user name and password from mssql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
def create_sql_server_connection():
import pyodbc
SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"]
UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"]
PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"]
DATABASE = "test_query_history"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={SERVER};"
f"UID={UID};"
f"PWD={PWD};"
f"DATABASE={DATABASE};"
"TrustServerCertificate=yes;"
"Encrypt=yes;"
# Optional to identify source of queries
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to SQL Server
Incluez une balise de Snowpark dans votre fonction de création de connexion :
def create_sql_server_connection():
import pyodbc
SERVER = "<your host name>"
PORT = <your port>
UID = "<your user name>"
PWD = "<your password>"
DATABASE = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={SERVER}:{PORT};"
f"UID={UID};"
f"PWD={PWD};"
f"DATABASE={DATABASE};"
"TrustServerCertificate=yes"
"Encrypt=yes"
# include this parameter for source tracing
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT
s.session_id,
s.program_name,
r.status,
t.text AS sql_text
FROM sys.dm_exec_sessions s
JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
WHERE s.program_name = 'snowflake-snowpark-python';
Oracle
Pour vous connecter à Oracle depuis Snowpark, vous avez besoin des deux paquets suivants :
Les exemples de code suivants montrent comment se connecter à Oracle à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.
Utiliser la DB-API pour se connecter à Oracle à partir d’un client Snowpark
Installez snowflake-snowpark-python[pandas] et oracledb :
pip install snowflake-snowpark-python[pandas]
pip install oracledb
Utilisez la DB-API pour extraire des données d’Oracle et définir la méthode d’usine pour créer une connexion à Oracle :
def create_oracle_db_connection():
import oracledb
HOST = "<your host>"
PORT = <your port>
SERVICE_NAME = "<your service name>"
USER = "<your user name>"
PASSWORD = "your password"
DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to Oracle from a stored procedure
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET ora_secret
TYPE = PASSWORD
USERNAME = 'ora_username'
PASSWORD = 'ora_password';
-- configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE ora_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('ora_host:ora_port');
-- configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
ALLOWED_NETWORK_RULES = (ora_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
ENABLED = true;
Use the Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'oracledb')
EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
SECRETS = ('cred' = ora_secret )
AS $$
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to Oracle
from snowflake.snowpark import Session
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_ora_dbapi();
Use the DB-API to connect to Oracle from a Snowflake notebook
Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et oracledb.
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau et l’EAI pour autoriser la sortie vers le point de terminaison source :
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
-- configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.
Utilisez la DB-API pour extraire des données d’Oracle dans une cellule Python d’un notebook Snowflake :
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to Oracle
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN,
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df_ora.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Oracle
Incluez une balise de Snowpark dans votre fonction de création de connexion :
def create_oracle_db_connection():
import oracledb
HOST = "myhost"
PORT = "myport"
SERVICE_NAME = "myservice"
USER = "myuser"
PASSWORD = "mypassword"
DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN,
)
# include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT
s.sid,
s.serial#,
s.username,
s.module,
q.sql_id,
q.sql_text,
q.last_active_time
FROM
v$session s
JOIN v$sql q ON s.sql_id = q.sql_id
WHERE
s.client_info = 'snowflake-snowpark-python'
PostgreSQL
Pour vous connecter à PostgreSQL depuis Snowpark, vous avez besoin des deux paquets suivants :
Les exemples de code suivants montrent comment se connecter à PostgreSQL à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.
Utiliser la DB-API pour se connecter à PostgreSQL depuis un client Snowpark
Installez psycopg2.
Définissez la méthode d’usine pour créer une connexion à PostgreSQL :
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user="pg_user",
password="pg_password",
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to PostgreSQL from a stored procedure
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
-- configure a secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
-- configure a network rule.
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- configure an external access integration.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Use the Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$
# Get user name and password from pg_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to PostgreSQL
from snowflake.snowpark import Session
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_pg_dbapi();
Use the DB-API to connect to PostgreSQL from a Snowflake notebook
Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et psycopg2.
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
-- Configure the secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
-- Configure the network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.
Utilisez la DB-API pour extraire des données de PostgreSQL dans une cellule Python d’un notebook Snowflake :
# Get the user name and password from :code:`pg_secret`
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to PostgreSQL
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
# Get the user name and password from :code:`pg_secret`
Source tracing when using the DB-API to connect to PostgreSQL
Incluez une balise de Snowpark dans votre fonction de création de connexion :
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user="pg_user",
password="pg_password",
# Include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
Exécuter le SQL suivant dans votre source de données pour capturer les requêtes Snowpark encore actives :
SELECT
pid,
usename AS username,
datname AS database,
application_name,
client_addr,
state,
query_start,
query
FROM
pg_stat_activity
WHERE
application_name = 'snowflake-snowpark-python';
MySQL
Pour vous connecter à MySQL depuis Snowpark, vous avez besoin des deux paquets suivants :
Les exemples de code suivants montrent comment se connecter à MySQL à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.
Utiliser la DB-API pour se connecter à MySQL depuis un client Snowpark
Installez pymysql :
pip install snowflake-snowpark-python[pandas]
pip install pymysql
Définissez la méthode d’usine pour créer une connexion à MySQL :
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password",
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to MySQL from a stored procedure
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
-- configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Utilisez la DB-API Snowpark Python pour extraire des données depuis MySQL dans une procédure stockée Python :
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to MySQL
from snowflake.snowpark import session
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure.
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_mysql_dbapi();
Use the DB-API to connect to MySQL from a Snowflake notebook
Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et pymysql.
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
-- configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- configure an EAI
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.
Utilisez la DB-API pour extraire des données de MySQL dans une cellule Python d’un notebook Snowflake :
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to MySQL
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to MySQL
Incluez une balise de Snowpark dans votre fonction de création de connexion :
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password",
# include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
Exécutez le SQL suivant dans votre source de données pour capturer les requêtes de Snowpark :
SELECT *
FROM performance_schema.events_statements_history_long
WHERE THREAD_ID = (
SELECT THREAD_ID
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'"
ORDER BY EVENT_ID DESC
LIMIT 1
)
Databricks
Pour vous connecter à Databricks depuis Snowpark, vous avez besoin des deux paquets suivants :
Les exemples de code suivants montrent comment se connecter à Databricks à partir d’un client Snowpark, de procédures stockées et d’un notebook Snowflake.
Use the DB-API to connect to Databricks from a Snowpark client
Installez databricks-sql-connector :
pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Définissez la méthode d’usine pour créer une connexion à Databricks :
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to Databricks from a stored procedure
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
Utilisez la DB-API Snowpark Python pour extraire des données de Databricks dans une procédure stockée Python :
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
SECRETS = ('cred' = dbx_secret )
AS $$
# Get user name and password from dbx_secret
import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
from snowflake.snowpark import Session
# Define the method for creating a connection to Databricks
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname="dbx_host",
http_path="dbx_path",
access_token=ACCESS_TOKEN,
)
return connection
# Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure.
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_dbx_dbapi();
Use the DB-API to connect to Databricks from a Snowflake notebook
Dans les paquets de notebooks Snowflake, sélectionnez snowflake-snowpark-python et databricks-sql-connector.
Configurez l’intégration d’accès externe (EAI), qui est nécessaire pour permettre à Snowflake de se connecter au point de terminaison source.
Note
PrivateLink est recommandé pour le transfert de données sécurisé, en particulier lorsqu’il s’agit d’informations sensibles. Assurez-vous que les privilèges PrivateLink nécessaires de votre compte Snowflake sont activés, et que la fonctionnalité PrivateLink est configurée et active dans l’environnement de votre notebook Snowflake.
Configurez le secret, une règle de réseau pour autoriser la sortie vers le point de terminaison source, et l’EAI :
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
Configurer l’accès externe pour Snowflake Notebooks, puis redémarrez la session du notebook.
Utilisez la DB-API pour extraire des données de Databricks dans une cellule Python d’un notebook Snowflake :
# Get user name and password from dbx_secret
import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to Databricks
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname="dbx_host",
http_path="dbx_path",
access_token=ACCESS_TOKEN,
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Databricks
Incluez une balise de Snowpark dans votre fonction de création de connexion :
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN,
# include this parameter for source tracing
user_agent_entry="snowflake-snowpark-python"
)
return connection
Accédez à l’historique des requêtes sur la console DataBricks et recherchez la requête dont la source est snowflake-snowpark-python.
Limitations
La DB-API snowpark Python prend en charge uniquement les pilotes conformes 2.0 DB-API (par exemple, pyodbc ou oracledb). Les pilotes JDBC ne sont pas pris en charge dans cette version.