Using the Snowpark Python DB-API¶
Com o Snowpark Python DB-API, os usuários do Snowpark Python podem extrair dados programaticamente de bancos de dados externos para o Snowflake. Isso inclui:
Python DB-API support: Connect to external databases using Python’s standard DB-API 2.0 drivers.
Configuração simplificada: use
pippara instalar os drivers necessários, sem necessidade de gerenciar dependências adicionais.
Com essas APIs, você pode extrair dados perfeitamente para tabelas do Snowflake e transformá-los usando Snowpark DataFrames para análise avançada.
The DB-API can be used in a similar way as the Spark JDBC API. Most parameters are designed to be identical or similar for better parity. At the same time, Snowpark emphasizes a Python-first design with intuitive naming conventions that avoid JDBC-specific configurations. This provides Python developers with a familiar experience. For more information that compares the Snowpark Python DB-API with the Spark JDBC API, see the following table:
Parâmetros de DB-API¶
Parâmetro |
Snowpark Python DB-API |
|---|---|
|
Função para criar uma conexão do Python DB-API. |
|
Especifica a tabela no banco de dados de origem. |
|
Consulta SQL agrupada como uma subconsulta para leitura de dados. |
|
Coluna de particionamento para leituras paralelas. |
|
Limite inferior para particionamento. |
|
Limite superior para particionamento. |
|
Número de partições para paralelismo. |
|
Tempo limite para execução de SQL (em segundos). |
|
Número de linhas buscadas por ida e volta. |
|
Esquema personalizado para extrair dados de bancos de dados externos. |
|
Número de trabalhadores para busca paralela e extração de dados de bancos de dados externos. |
|
Lista de condições para partições de cláusula WHERE. |
|
Executa uma instrução SQL ou PL/SQL na inicialização da sessão. |
|
Executes the workload using a Snowflake UDTF for better performance. |
|
Number of fetched batches to be merged into a single Parquet file before it is uploaded. |
Compreensão de paralelismo¶
Snowpark Python DB-API has two forms of ingestion mechanism underlying.
- Ingestão local
Na ingestão local, o Snowpark primeiro busca os dados de fontes externas ao seu ambiente local, em que função
dbapi()é chamada e os converte em arquivos Parquet. Em seguida, o Snowpark carrega os arquivos Parquet em uma área de preparação temporária do Snowflake e os copia da área de preparação para uma tabela temporária.- Ingestão UDTF
Na ingestão UDTF, todas as cargas de trabalho são executadas no servidor Snowflake. O Snowpark primeiro cria uma UDTF e a executa, e a UDTF ingere os dados diretamente no Snowflake e os armazena em uma tabela temporária.
Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion.
- Coluna de partição
Este método divide os dados de origem em uma série de partições com base em quatro parâmetros, quando os usuários chamam
dbapi():columnlower_boundupper_boundnum_partitions
Esses quatro parâmetros precisam ser definidos ao mesmo tempo, e
columndeve ser um tipo de data ou numérico.- Predicates
Este método divide os dados de origem em partições baseadas em predicados de parâmetros, que são uma lista de expressões adequadas para inclusão em cláusulas
WHERE, em que cada expressão define uma partição. Os predicados oferecem um meio mais flexível de dividir as partições; por exemplo, você pode dividi-las em colunas boolianas ou não numéricas.
A Snowpark Python DB-API também permite ajustar o nível de paralelismo em uma partição.
- Fetch_size
Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are fetched, allowing reading and writing to overlap and maximize throughput.
Ao combinar os métodos de ingestão e paralelismo acima, o Snowflake oferece quatro formas de ingestão:
Ingestão local com coluna de partição
df_local_par_column = session.read.dbapi( create_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 )
Ingestão local com predicados
df_local_predicates = session.read.dbapi( create_connection, table="target_table", fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Ingestão UDTF com coluna de partição
udtf_configs = { "external_access_integration": "<your external access integration>" } df_udtf_par_column = session.read.dbapi( create_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 )
Ingestão UDTF com predicados
udtf_configs = { "external_access_integration": "<your external access integration>" } df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
SQL server¶
To connect to SQL Server from Snowpark, you need the following three packages:
Snowpark: snowflake-snowpark-python[pandas]
SQL Server ODBC Driver: Microsoft ODBC Driver for SQL Server
Ao instalar o driver, você concorda com o EULA da Microsoft.
A biblioteca pyodbc de código aberto: pyodbc
The following code examples show how to connect to SQL Server from a Snowpark client and a stored procedure.
Use the DB-API to connect to SQL Server from a Snowpark client¶
Install the Python SQL Driver:
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)" brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Install
snowflake-snowpark-python[pandas]andpyodbc:pip install snowflake-snowpark-python[pandas] pip install pyodbc
Define the factory method for creating a connection to SQL Server:
def create_sql_server_connection(): import pyodbc SERVER = "<your host name>" PORT = <your port> UID = "<your user name>" PWD = "<your password>" DATABASE = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER}:{PORT};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes" "Encrypt=yes" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Uso de DB-API para conectar-se ao SQL Server de um procedimento armazenado¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true;
Use the DB-API to pull data from SQL Server in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_mssql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql') EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration) SECRETS = ('cred' = mssql_secret ) AS $$ # Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define a method to connect to SQL server_hostname from snowflake.snowpark import Session def create_sql_server_connection(): import pyodbc host = "<your host>" port = <your port> username = USER password = PASSWORD database = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={host},{port};" f"DATABASE={database};" f"UID={username};" f"PWD={password};" "TrustServerCertificate=yes" "Encrypt=yes" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_mssql_dbapi();
Using DB-API to connect to SQL server from a Snowflake notebook¶
From Snowflake Notebook packages, select
snowflake-snowpark-pythonandpyodbc.Na guia de arquivos ao lado esquerdo, abra o arquivo
environment.ymle adicione a seguinte linha de código após as outras entradas abaixo das dependências:- msodbcsql18
Configure the secret, a network rule to allow egress to the source endpoint, and external access integration:
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret); -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true;
Configurar acesso externo para o Snowflake Notebooks, and then restart the notebook session.
Use the DB-API to pull data from SQL Server in a Python cell of a Snowflake notebook:
# Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() def create_sql_server_connection(): import pyodbc SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"] UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"] PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"] DATABASE = "test_query_history" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes;" "Encrypt=yes;" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to SQL server¶
Inclua uma tag do Snowpark na função de criação da conexão:
def create_sql_server_connection(): import pyodbc SERVER = "<your host name>" PORT = <your port> UID = "<your user name>" PWD = "<your password>" DATABASE = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER}:{PORT};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes" "Encrypt=yes" # include this parameter for source tracing "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Oracle¶
To connect to Oracle from Snowpark, you need the following two packages:
Snowpark: snowflake-snowpark-python[pandas]
A biblioteca de código aberto oracledb: orakedb
The following code examples show how to connect to Oracle from a Snowpark client, stored procedures, and a Snowflake notebook.
Use the DB-API to connect to Oracle from a Snowpark client¶
Install
snowflake-snowpark-python[pandas]andoracledb:pip install snowflake-snowpark-python[pandas] pip install oracledb
Use the DB-API to pull data from Oracle and define the factory method for creating a connection to Oracle:
def create_oracle_db_connection(): import oracledb HOST = "<your host>" PORT = <your port> SERVICE_NAME = "<your service name>" USER = "<your user name>" PASSWORD = "your password" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Uso de DB-API para conectar-se ao Oracle a partir de um procedimento armazenado¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI: CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; -- configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Use Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_ora_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'oracledb') EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration) SECRETS = ('cred' = ora_secret ) AS $$ # Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to Oracle from snowflake.snowpark import Session def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_ora_dbapi();
Using DB-API to connect to Oracle from a Snowflake notebook¶
From Snowflake Notebook packages, select
snowflake-snowpark-pythonandoracledb.Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule, and EAI to allow egress to the source endpoint:
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI: CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret); -- configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Configurar acesso externo para o Snowflake Notebooks, and then restart the notebook session.
Use the DB-API to pull data from Oracle in a Python cell of a Snowflake notebook:
# Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Oracle def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN, ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df_ora.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to Oracle¶
Inclua uma tag do Snowpark na função de criação da conexão.
def create_oracle_db_connection(): import oracledb HOST = "myhost" PORT = "myport" SERVICE_NAME = "myservice" USER = "myuser" PASSWORD = "mypassword" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN, ) # include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT s.sid, s.serial#, s.username, s.module, q.sql_id, q.sql_text, q.last_active_time FROM v$session s JOIN v$sql q ON s.sql_id = q.sql_id WHERE s.client_info = 'snowflake-snowpark-python'
PostgreSQL¶
To connect to PostgreSQL from Snowpark, you need the following two packages:
Snowpark: snowflake-snowpark-python[pandas]
A biblioteca de código aberto psiopg2: psiopg2
The following code examples show how to connect to PostgreSQL from a Snowpark client, stored procedures, and a Snowflake notebook.
Use the DB-API to connect to PostgreSQL from a Snowpark client¶
Install
psycopg2:pip install psycopg2
Define the factory method for creating a connection to PostgreSQL:
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Uso de DB-API para conectar-se a PostgreSQL de um procedimento armazenado¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- configure a secret CREATE OR REPLACE SECRET pg_secret TYPE = PASSWORD USERNAME = 'pg_username' PASSWORD = 'pg_password'; -- configure a network rule. CREATE OR REPLACE NETWORK RULE pg_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pg_host:pg_port'); -- configure an external access integration. CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration ALLOWED_NETWORK_RULES = (pg_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (pg_secret) ENABLED = true;
Use Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_pg_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'psycopg2') EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration) SECRETS = ('cred' = pg_secret ) AS $$ # Get user name and password from pg_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to PostgreSQL from snowflake.snowpark import Session def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_pg_dbapi();
Using DB-API to connect to PostgreSQL from a Snowflake notebook¶
From Snowflake Notebook packages, select
snowflake-snowpark-pythonandpsycopg2.Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
-- Configure the secret CREATE OR REPLACE SECRET pg_secret TYPE = PASSWORD USERNAME = 'pg_username' PASSWORD = 'pg_password'; ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret); -- Configure the network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE pg_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pg_host:pg_port'); -- Configure external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration ALLOWED_NETWORK_RULES = (pg_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (pg_secret) ENABLED = true;
Configurar acesso externo para o Snowflake Notebooks, and then restart the notebook session.
Use the DB-API to pull data from PostgreSQL in a Python cell of a Snowflake notebook:
# Get the user name and password from :code:`pg_secret` import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to PostgreSQL def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table') # Get the user name and password from :code:`pg_secret`
Source tracing when using DB-API to connect to PostgreSQL¶
Inclua uma tag do Snowpark na função de criação da conexão.
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", # Include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
MySQL¶
To connect to MySQL from Snowpark, you need the following two packages:
Snowpark: snowflake-snowpark-python[pandas]
A biblioteca de código aberto pymysql: PyMySQL
The following code examples show how to connect to MySQL from a Snowpark client, stored procedures, and a Snowflake notebook.
Use the DB-API to connect to MySQL from a Snowpark client¶
Install pymysql:
pip install snowflake-snowpark-python[pandas] pip install pymysql
Define the factory method for creating a connection to MySQL:
def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, database="mysql_db", user="mysql_user", password="mysql_password", # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Uso de DB-API para conectar-se a MySQL de um procedimento armazenado¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; -- configure a network rule. CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Use the Snowpark Python DB-API to pull data from MySQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pymysql') EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration) SECRETS = ('cred' = mysql_secret ) AS $$ # Get user name and password from mysql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to MySQL from snowflake.snowpark import session def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, dbname="mysql_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure. def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_mysql_dbapi();
Using DB-API to connect to MySQL from a Snowflake notebook¶
From Snowflake Notebook packages, select
snowflake-snowpark-pythonandpymysql.Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret); -- configure a network rule. CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an EAI CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Configurar acesso externo para o Snowflake Notebooks, and then restart the notebook session.
Use the DB-API to pull data from MySQL in a Python cell of a Snowflake notebook:
# Get user name and password from mysql_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to MySQL def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, dbname="mysql_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Rastreamento da fonte ao usar a DB-API para conexão com o MySQL¶
Inclua uma tag do Snowpark na função de criação da conexão.
def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, database="mysql_db", user="mysql_user", password="mysql_password", # include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark:
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID FROM performance_schema.events_statements_history_long WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'" ORDER BY EVENT_ID DESC LIMIT 1 )
Databricks¶
To connect to Databricks from Snowpark, you need the following two packages:
Snowpark: snowflake-snowpark-python[pandas]
A biblioteca de código aberto psiopg2: databricks-sql-connector
The following code examples show how to connect to Databricks from a Snowpark client, stored procedures, and a Snowflake notebook.
Uso de DB-API para conectar-se ao Databricks a partir de um cliente Snowpark¶
Install databricks-sql-connector:
pip install snowflake-snowpark-python[pandas] pip install databricks-sql-connector
Define the factory method for creating a connection to Databricks:
def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname=HOST, http_path=PATH, access_token=ACCESS_TOKEN ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Uso de DB-API para conectar-se ao Databricks a partir de um procedimento armazenado¶
Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Use the Snowpark Python DB-API to pull data from Databricks in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector') EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration) SECRETS = ('cred' = dbx_secret ) AS $$ # Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') from snowflake.snowpark import Session # Define the method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure. def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_dbx_dbapi();
Using DB-API to connect to Databricks from a Snowflake notebook¶
Em Pacotes de notebooks do Snowflake, selecione
snowflake-snowpark-pythonedatabricks-sql-connector.Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.
Nota
PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.
Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret); CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Configurar acesso externo para o Snowflake Notebooks, and then restart the notebook session.
Use the DB-API to pull data from Databricks in a Python cell of a Snowflake notebook:
# Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to Databricks¶
Inclua uma tag do Snowpark na função de criação da conexão.
def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname=HOST, http_path=PATH, access_token=ACCESS_TOKEN, # include this parameter for source tracing user_agent_entry="snowflake-snowpark-python" ) return connection
Navegue até o histórico de consultas no console do DataBricks e procure a consulta que tem a fonte
snowflake-snowpark-python.
Limitações¶
The Snowpark Python DB-API supports only Python DB-API 2.0–compliant drivers (for example, pyodbc or oracledb). JDBC drivers are not supported in this release.