With the Snowpark Python DB-API, Snowpark Python users can programmatically pull data from external databases into Snowflake. The DB-API includes:
Python DB-API support: Connect to external databases using Python’s standard DB-API 2.0 drivers.
Streamlined setup: Use pip to install the necessary drivers, with no need to manage additional dependencies.
With these APIs, you can seamlessly pull data into Snowflake tables and transform it using Snowpark DataFrames for advanced analytics.
The DB-API can be used in a similar way as the Spark JDBC API. Most parameters are designed to be identical or similar for better parity. At the same time, Snowpark emphasizes a Python-first design with intuitive naming conventions that avoid JDBC-specific configurations. This provides Python developers with a familiar experience. For more information that compares the Snowpark Python DB-API with the Spark JDBC API, see the following table:
The Snowpark Python DB-API has two underlying forms of ingestion mechanisms:
Local ingestion
In local ingestion, Snowpark first fetches data from external sources to your local environment, where the dbapi() function is called and
converts them to Parquet files. Next, Snowpark uploads these Parquet files to a temporary Snowflake stage and copies them into a temporary
table from the stage.
UDTF ingestion
In UDTF ingestion, all workloads run on the Snowflake server. Snowpark first creates a UDTF and executes it, and the UDTF directly
ingests data into Snowflake and stores it in a temporary table.
The Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion:
Partition column
This method divides source data into multiple partitions based on four parameters when users call dbapi():
column
lower_bound
upper_bound
num_partitions
These four parameters have to be set at the same time and column must be numeric or date type.
Predicates
This method divides source data into partitions based on parameter predicates, which are a list of expressions suitable for inclusion
in WHERE clauses, where each expression defines a partition. Predicates provide a more flexible way of dividing partitions; for example,
you can divide partitions on Boolean or non-numeric columns.
The Snowpark Python DB-API also allows the adjustment of parallelism level within a partition:
Fetch_size
Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are
fetched, which allows reading and writing to overlap and maximizes throughput.
By combining the listed methods of ingestion and parallelism, Snowflake has four ways of ingestion:
Local ingestion with partition column
df_local_par_column = session.read.dbapi(
create_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
Define the factory method for creating a connection to SQL Server:
defcreate_sql_server_connection():
import pyodbc
SERVER = "<your host name>"
PORT = <your port>
UID = "<your user name>"
PWD = "<your password>"
DATABASE = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"f"SERVER={SERVER}:{PORT};"f"UID={UID};"f"PWD={PWD};"f"DATABASE={DATABASE};""TrustServerCertificate=yes""Encrypt=yes"# Optional to identify source of queries"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to SQL Server from a stored procedure¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure a secret to allow egress to the source endpointCREATEORREPLACESECRET mssql_secret
TYPE=PASSWORDUSERNAME='mssql_username'PASSWORD='mssql_password';-- Configure a network rule to allow egress to the source endpointCREATEORREPLACENETWORKRULE mssql_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('mssql_host:mssql_port');-- Configure an external access integrationCREATEORREPLACEEXTERNALACCESSINTEGRATION mssql_access_integration
ALLOWED_NETWORK_RULES=(mssql_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(mssql_secret)ENABLED=true;
Use the DB-API to pull data from SQL Server in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
SECRETS = ('cred' = mssql_secret )
AS $$
# Get user name and password from mssql_secretimport _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define a method to connect to SQL Server_hostnamefrom snowflake.snowpark import Session
defcreate_sql_server_connection():
import pyodbc
host = "<your host>"
port = <your port>
username = USER
password = PASSWORD
database = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"f"SERVER={host},{port};"f"DATABASE={database};"f"UID={username};"f"PWD={password};""TrustServerCertificate=yes""Encrypt=yes"# Optional to identify source of queries"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
defrun(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates# as stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_mssql_dbapi();
Use the DB-API to connect to SQL Server from a Snowflake notebook¶
In the Files pane, open the file environment.yml, and under Dependencies, add the following line of code after other entries:
-msodbcsql
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure a secret to allow egress to the source endpointCREATEORREPLACESECRET mssql_secret
TYPE=PASSWORDUSERNAME='mssql_username'PASSWORD='mssql_password';ALTERNOTEBOOK mynotebook SETSECRETS=('snowflake-secret-object'= mssql_secret);-- Configure a network rule to allow egress to the source endpointCREATEORREPLACENETWORKRULE mssql_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('mssql_host:mssql_port');-- Configure an external access integrationCREATEORREPLACEEXTERNALACCESSINTEGRATION mssql_access_integration
ALLOWED_NETWORK_RULES=(mssql_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(mssql_secret)ENABLED=true;
Use the DB-API to pull data from SQL Server in a Python cell of a Snowflake notebook:
# Get user name and password from mssql_secretimport _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
defcreate_sql_server_connection():
import pyodbc
SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"]
UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"]
PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"]
DATABASE = "test_query_history"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"f"SERVER={SERVER};"f"UID={UID};"f"PWD={PWD};"f"DATABASE={DATABASE};""TrustServerCertificate=yes;""Encrypt=yes;"# Optional to identify source of queries"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to SQL Server¶
Include a tag of Snowpark in your create connection function:
defcreate_sql_server_connection():
import pyodbc
SERVER = "<your host name>"
PORT = <your port>
UID = "<your user name>"
PWD = "<your password>"
DATABASE = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"f"SERVER={SERVER}:{PORT};"f"UID={UID};"f"PWD={PWD};"f"DATABASE={DATABASE};""TrustServerCertificate=yes""Encrypt=yes"# include this parameter for source tracing"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
Run the following SQL in your data source to capture queries from Snowpark that are still live:
SELECT
s.session_id,
s.program_name,
r.status,
t.textAS sql_text
FROM sys.dm_exec_sessions s
JOIN sys.dm_exec_requests r ON s.session_id= r.session_idCROSSAPPLY sys.dm_exec_sql_text(r.sql_handle)AS t
WHERE s.program_name ='snowflake-snowpark-python';
Use the DB-API to pull data from Oracle and define the factory method for creating a connection to Oracle:
defcreate_oracle_db_connection():
import oracledb
HOST = "<your host>"
PORT = <your port>
SERVICE_NAME = "<your service name>"
USER = "<your user name>"
PASSWORD = "your password"
DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to Oracle from a stored procedure¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:CREATEORREPLACESECRET ora_secret
TYPE=PASSWORDUSERNAME='ora_username'PASSWORD='ora_password';-- configure a network rule to allow egress to the source endpointCREATEORREPLACENETWORKRULE ora_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('ora_host:ora_port');-- configure an external access integrationCREATEORREPLACEEXTERNALACCESSINTEGRATION ora_access_integration
ALLOWED_NETWORK_RULES=(ora_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(ora_secret)ENABLED=true;
Use the Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'oracledb')
EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
SECRETS = ('cred' = ora_secret )
AS $$
# Get user name and password from ora_secretimport _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to Oraclefrom snowflake.snowpark import Session
defcreate_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"return connection
defrun(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates# as stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_ora_dbapi();
Use the DB-API to connect to Oracle from a Snowflake notebook¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule, and EAI to allow egress to the source endpoint:
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:CREATEORREPLACESECRET mysql_secret
TYPE=PASSWORDUSERNAME='mysql_username'PASSWORD='mysql_password';ALTERNOTEBOOK mynotebook SETSECRETS=('snowflake-secret-object'= mysql_secret);-- configure a network rule to allow egress to the source endpointCREATEORREPLACENETWORKRULE mysql_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('mysql_host:mysql_port');-- configure an external access integrationCREATEORREPLACEEXTERNALACCESSINTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES=(mysql_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(mysql_secret)ENABLED=true;
Use the DB-API to pull data from Oracle in a Python cell of a Snowflake notebook:
# Get user name and password from ora_secretimport _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to Oracledefcreate_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN,
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df_ora.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Oracle¶
Include a tag of Snowpark in your create connection function:
defcreate_oracle_db_connection():
import oracledb
HOST = "myhost"
PORT = "myport"
SERVICE_NAME = "myservice"
USER = "myuser"
PASSWORD = "mypassword"
DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN,
)
# include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"return connection
Run the following SQL in your data source to capture queries from Snowpark that are still live:
SELECT
s.sid,
s.serial#,
s.username,
s.module,
q.sql_id,
q.sql_text,
q.last_active_time
FROM
v$session s
JOIN v$sql q ON s.sql_id = q.sql_id
WHERE
s.client_info ='snowflake-snowpark-python'
The following code examples show how to connect to PostgreSQL from a Snowpark client, stored procedures, and a Snowflake notebook.
Use the DB-API to connect to PostgreSQL from a Snowpark client¶
Install psycopg2:
pip install psycopg2
Define the factory method for creating a connection to PostgreSQL:
defcreate_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user="pg_user",
password="pg_password",
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to PostgreSQL from a stored procedure¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- configure a secretCREATEORREPLACESECRET pg_secret
TYPE=PASSWORDUSERNAME='pg_username'PASSWORD='pg_password';-- configure a network rule.CREATEORREPLACENETWORKRULE pg_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('pg_host:pg_port');-- configure an external access integration.CREATEORREPLACEEXTERNALACCESSINTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES=(pg_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(pg_secret)ENABLED=true;
Use the Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$
# Get user name and password from pg_secretimport _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to PostgreSQLfrom snowflake.snowpark import Session
defcreate_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
defrun(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates# as stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_pg_dbapi();
Use the DB-API to connect to PostgreSQL from a Snowflake notebook¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure the secretCREATEORREPLACESECRET pg_secret
TYPE=PASSWORDUSERNAME='pg_username'PASSWORD='pg_password';ALTERNOTEBOOK pg_notebook SETSECRETS=('snowflake-secret-object'= pg_secret);-- Configure the network rule to allow egress to the source endpointCREATEORREPLACENETWORKRULE pg_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('pg_host:pg_port');-- Configure external access integrationCREATEORREPLACEEXTERNALACCESSINTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES=(pg_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(pg_secret)ENABLED=true;
Use the DB-API to pull data from PostgreSQL in a Python cell of a Snowflake notebook:
# Get the user name and password from :code:`pg_secret`import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to PostgreSQLdefcreate_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
# Get the user name and password from :code:`pg_secret`
Source tracing when using the DB-API to connect to PostgreSQL¶
Include a tag of Snowpark in your create connection function:
defcreate_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user="pg_user",
password="pg_password",
# Include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
Run the following SQL in your data source to capture queries from Snowpark that are still live:
Define the factory method for creating a connection to MySQL:
defcreate_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password",
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to MySQL from a stored procedure¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATEORREPLACESECRET mysql_secret
TYPE=PASSWORDUSERNAME='mysql_username'PASSWORD='mysql_password';-- configure a network rule.CREATEORREPLACENETWORKRULE mysql_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('mysql_host:mysql_port');-- configure an external access integrationCREATEORREPLACEEXTERNALACCESSINTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES=(mysql_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(mysql_secret)ENABLED=true;
Use the Snowpark Python DB-API to pull data from MySQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$
# Get user name and password from mysql_secretimport _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to MySQLfrom snowflake.snowpark import session
defcreate_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure.defrun(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates# as stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_mysql_dbapi();
Use the DB-API to connect to MySQL from a Snowflake notebook¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATEORREPLACESECRET mysql_secret
TYPE=PASSWORDUSERNAME='mysql_username'PASSWORD='mysql_password';ALTERNOTEBOOK mynotebook SETSECRETS=('snowflake-secret-object'= mysql_secret);-- configure a network rule.CREATEORREPLACENETWORKRULE mysql_network_rule
MODE=EGRESSTYPE=HOST_PORTVALUE_LIST=('mysql_host:mysql_port');-- configure an EAICREATEORREPLACEEXTERNALACCESSINTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES=(mysql_network_rule)ALLOWED_AUTHENTICATION_SECRETS=(mysql_secret)ENABLED=true;
Use the DB-API to pull data from MySQL in a Python cell of a Snowflake notebook:
# Get user name and password from mysql_secretimport _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to MySQLdefcreate_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to MySQL¶
Include a tag of Snowpark in your create connection function:
defcreate_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password",
# include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
Run the following SQL in your data source to capture queries from Snowpark:
SELECT*FROM performance_schema.events_statements_history_long
WHERE THREAD_ID =(SELECT THREAD_ID
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT ="SET @program_name='snowflake-snowpark-python'"ORDERBY EVENT_ID DESCLIMIT1)
Define the factory method for creating a connection to Databricks:
defcreate_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to Databricks from a stored procedure¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
Use the Snowpark Python DB-API to pull data from Databricks in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
SECRETS = ('cred' = dbx_secret )
AS $$
# Get user name and password from dbx_secretimport _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
from snowflake.snowpark import Session
# Define the method for creating a connection to Databricksdefcreate_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname="dbx_host",
http_path="dbx_path",
access_token=ACCESS_TOKEN,
)
return connection
# Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure.defrun(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates# as stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_dbx_dbapi();
Use the DB-API to connect to Databricks from a Snowflake notebook¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Note
PrivateLink is recommended for secure data transfer, especially when you’re dealing with
sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the
PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
Use the DB-API to pull data from Databricks in a Python cell of a Snowflake notebook:
# Get user name and password from dbx_secretimport _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to Databricksdefcreate_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname="dbx_host",
http_path="dbx_path",
access_token=ACCESS_TOKEN,
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as# stated in the understanding parallelism section# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Databricks¶
Include a tag of Snowpark in your create connection function:
defcreate_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN,
# include this parameter for source tracing
user_agent_entry="snowflake-snowpark-python"
)
return connection
Navigate to query history on the DataBricks console and search for the query whose source is snowflake-snowpark-python.
The Snowpark Python DB-API supports only Python DB-API 2.0–compliant drivers (for example, pyodbc or oracledb). JDBC drivers are not supported in this release.