Verwenden der Snowpark Python-DB-API¶
With Snowpark Python DB-API, Snowpark Python users can programmatically pull data from external databases into Snowflake. It includes:
Python- DB-API-Unterstützung: Stellen Sie eine Verbindung mit externen Datenbanken über Standard-DB-API 2.0-Treiber von Python her.
Optimierte Einrichtung: Verwenden Sie
pipum die erforderlichen Treiber zu installieren, ohne zusätzliche Abhängigkeiten verwalten zu müssen.
With these APIs, you can seamlessly pull data into Snowflake tables and transform it using Snowpark DataFrames for advanced analytics.
Die DB-API kann auf ähnliche Weise wie die Spark JDBC-API verwendet werden. Die meisten Parameter sind so konzipiert, dass sie für eine bessere Parität identisch oder ähnlich sind. Gleichzeitig legt Snowpark Wert auf ein Python-First-Design mit intuitiven Namenskonventionen, damit JDBC-spezifische Konfigurationen vermieden werden. Dies bietet Python-Entwicklern eine vertraute Erfahrung. Weitere Informationen zum Vergleich der Snowpark Python-DB-API mit der Spark-JDBC-API finden Sie in der folgenden Tabelle:
DB-API-Parameter¶
Parameter |
Snowpark Python-DB-API |
|---|---|
|
Funktion zum Erstellen einer Python DB-API-Verbindung |
|
Gibt die Tabelle in der Quelldatenbank an. |
|
SQL-Abfrage, die als Unterabfrage zum Lesen von Daten eingeschlossen ist. |
|
Partitionierungsspalte für parallele Lesevorgänge. |
|
Untere Grenze für die Partitionierung. |
|
Obere Grenze für die Partitionierung. |
|
Anzahl der Partitionen für Parallelität. |
|
Timeout für SQL-Ausführung (in Sekunden). |
|
Anzahl der Zeilen, die pro Roundtrip abgerufen wurden. |
|
Kundenspezifisches Schema zum Abrufen von Daten aus externen Datenbanken. |
|
Anzahl der Worker für das parallele Abrufen von Daten aus externen Datenbanken. |
|
Auflistung der Bedingungen für Partitionen mit WHERE-Klausel. |
|
Führt eine SQL- oder PL/SQL-Anweisung bei der Initialisierung der Sitzung aus. |
|
Führt den Workload mit einer Snowflake-UDTF-Datei aus, um eine bessere Leistung zu erzielen. |
|
Anzahl der abgerufenen Batches, die vor dem Hochladen in eine einzige Parquet-Datei zusammengeführt werden sollen. |
Erläuterungen zur Parallelität¶
Snowpark Python DB-API has two forms of ingestion mechanism underlying.
- Local ingestion
In local ingestion, Snowpark first fetches data from external sources to your local environment where the
dbapi()function is called and converts them to Parquet files. Next, Snowpark uploads these Parquet files to a temporary Snowflake stage and copies them into a temporary table from the stage.- UDTF ingestion
In UDTF ingestion, all workloads run on the Snowflake server. Snowpark first creates a UDTF and executes it, and the UDTF directly ingests data into Snowflake and stores it in a temporary table.
Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion.
- Partition column
This method divides source data into a number of partitions based on four parameters when users call
dbapi():columnlower_boundupper_boundnum_partitions
These four parameters have to be set at the same time and
columnmust be numeric or date type.- Predicates
This method divides source data into partitions based on parameter predicates, which are a list of expressions suitable for inclusion in
WHEREclauses, where each expression defines a partition. Predicates provide a more flexible way of dividing partitions; for example, you can divide partitions on boolean or non-numeric columns.
Snowpark Python DB-API also allows adjusting parallelism level within a partition.
- Fetch_size
Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are fetched, allowing reading and writing to overlap and maximize throughput.
By combining the above methods of ingestion and parallelism, Snowflake has four ways of ingestion:
Local ingestion with partition column
df_local_par_column = session.read.dbapi( create_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 )
Local ingestion with predicates
df_local_predicates = session.read.dbapi( create_connection, table="target_table", fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
UDTF ingestion with partition column
udtf_configs = { "external_access_integration": "<your external access integration>" } df_udtf_par_column = session.read.dbapi( create_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 )
UDTF ingestion with predicates
udtf_configs = { "external_access_integration": "<your external access integration>" } df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
SQL server¶
Um von Snowpark aus eine Verbindung zu SQL Server herzustellen, benötigen Sie die folgenden drei Pakete:
Snowpark: snowflake-snowpark-python[pandas]
SQL Server-ODBC-Treiber: Microsoft-ODBC-Treiber für SQL Server
Durch die Installation des Treibers erklären Sie sich mit der EULA von Microsoft einverstanden.
Die Open Source-Bibliothek pyodbc: pyodbc
Die folgenden Codebeispiele zeigen, wie Sie eine Verbindung zu SQL Server von einem Snowpark-Client und einer gespeicherten Prozedur aus herstellen.
Verwenden der DB-API zur Herstellung einer Verbindung zu SQL Server von einem Snowpark-Client aus¶
Installieren Sie den Python-SQL-Treiber:
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)" brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Installieren Sie
snowflake-snowpark-python[pandas]undpyodbc:pip install snowflake-snowpark-python[pandas] pip install pyodbc
Definieren Sie die Factory-Methode zum Herstellen der Verbindung zu SQL Server:
def create_sql_server_connection(): import pyodbc SERVER = "<your host name>" PORT = <your port> UID = "<your user name>" PWD = "<your password>" DATABASE = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER}:{PORT};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes" "Encrypt=yes" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Using DB-API to connect to SQL Server from a stored procedure¶
Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true;
Use the DB-API to pull data from SQL Server in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_mssql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql') EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration) SECRETS = ('cred' = mssql_secret ) AS $$ # Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define a method to connect to SQL server_hostname from snowflake.snowpark import Session def create_sql_server_connection(): import pyodbc host = "<your host>" port = <your port> username = USER password = PASSWORD database = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={host},{port};" f"DATABASE={database};" f"UID={username};" f"PWD={password};" "TrustServerCertificate=yes" "Encrypt=yes" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_mssql_dbapi();
Using DB-API to connect to SQL server from a Snowflake notebook¶
From Snowflake Notebook packages, select
snowflake-snowpark-pythonandpyodbc.In the files tab on the left side, open the file
environment.ymland add the following line of code after other entries under dependencies:- msodbcsql18
Configure the secret, a network rule to allow egress to the source endpoint, and external access integration:
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret); -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
Use the DB-API to pull data from SQL Server in a Python cell of a Snowflake notebook:
# Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() def create_sql_server_connection(): import pyodbc SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"] UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"] PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"] DATABASE = "test_query_history" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes;" "Encrypt=yes;" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to SQL server¶
Include a tag of Snowpark in your create connection function:
def create_sql_server_connection(): import pyodbc SERVER = "<your host name>" PORT = <your port> UID = "<your user name>" PWD = "<your password>" DATABASE = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER}:{PORT};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes" "Encrypt=yes" # include this parameter for source tracing "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection
Run the following SQL in your data source to capture queries from Snowpark that are still live:
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Oracle¶
Um von Snowpark aus eine Verbindung zu Oracle herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open Source-Bibliothek oracledb: oracledb
Die folgenden Codebeispiele zeigen, wie Sie von einem Snowpark-Client, gespeicherten Prozeduren und einem Snowflake Notebook aus eine Verbindung zu Oracle herstellen:
Verwenden der DB-API zur Verbindung zu Oracle von einem Snowpark-Client aus¶
Installieren Sie
snowflake-snowpark-python[pandas]undoracledb:pip install snowflake-snowpark-python[pandas] pip install oracledb
Verwenden Sie die DB-API, um Daten aus Oracle abzurufen, und definieren Sie die Factory-Methode zum Erstellen einer Verbindung zu Oracle:
def create_oracle_db_connection(): import oracledb HOST = "<your host>" PORT = <your port> SERVICE_NAME = "<your service name>" USER = "<your user name>" PASSWORD = "your password" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Using DB-API to connect to Oracle from a stored procedure¶
Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI: CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; -- configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Use Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_ora_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'oracledb') EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration) SECRETS = ('cred' = ora_secret ) AS $$ # Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to Oracle from snowflake.snowpark import Session def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_ora_dbapi();
Using DB-API to connect to Oracle from a Snowflake notebook¶
Wählen Sie unter :doc:` Snowflake Notebook-Pakete </user-guide/ui-snowsight/notebooks-import-packages> `
snowflake-snowpark-pythonundoracledbaus.Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Configure the secret, a network rule, and EAI to allow egress to the source endpoint:
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI: CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret); -- configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
Use the DB-API to pull data from Oracle in a Python cell of a Snowflake notebook:
# Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Oracle def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN, ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df_ora.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to Oracle¶
Include a tag of Snowpark in your create connection function.
def create_oracle_db_connection(): import oracledb HOST = "myhost" PORT = "myport" SERVICE_NAME = "myservice" USER = "myuser" PASSWORD = "mypassword" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN, ) # include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection
Run the following SQL in your data source to capture queries from Snowpark that are still live:
SELECT s.sid, s.serial#, s.username, s.module, q.sql_id, q.sql_text, q.last_active_time FROM v$session s JOIN v$sql q ON s.sql_id = q.sql_id WHERE s.client_info = 'snowflake-snowpark-python'
PostgreSQL¶
Um von Snowpark aus eine Verbindung mit PostgreSQL herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open-Source-Bibliothek psycopg2: psycopg2
Die folgenden Codebeispiele zeigen, wie Sie von einem Snowpark-Client, gespeicherten Prozeduren und einem Snowflake Notebook aus eine Verbindung zu PostgreSQL herstellen.
Verwenden der DB-API, um eine Verbindung zu PostgreSQL von einem Snowpark-Client aus herzustellen¶
Installieren Sie
psycopg2:pip install psycopg2
Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu PostgreSQL:
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Using DB-API to connect to PostgreSQL from a stored procedure¶
Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
-- configure a secret CREATE OR REPLACE SECRET pg_secret TYPE = PASSWORD USERNAME = 'pg_username' PASSWORD = 'pg_password'; -- configure a network rule. CREATE OR REPLACE NETWORK RULE pg_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pg_host:pg_port'); -- configure an external access integration. CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration ALLOWED_NETWORK_RULES = (pg_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (pg_secret) ENABLED = true;
Use Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_pg_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'psycopg2') EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration) SECRETS = ('cred' = pg_secret ) AS $$ # Get user name and password from pg_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to PostgreSQL from snowflake.snowpark import Session def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_pg_dbapi();
Using DB-API to connect to PostgreSQL from a Snowflake notebook¶
Wählen Sie unter :doc:` Snowflake Notebook-Pakete </user-guide/ui-snowsight/notebooks-import-packages> `
snowflake-snowpark-pythonundpsycopg2aus.Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
-- Configure the secret CREATE OR REPLACE SECRET pg_secret TYPE = PASSWORD USERNAME = 'pg_username' PASSWORD = 'pg_password'; ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret); -- Configure the network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE pg_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pg_host:pg_port'); -- Configure external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration ALLOWED_NETWORK_RULES = (pg_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (pg_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
Use the DB-API to pull data from PostgreSQL in a Python cell of a Snowflake notebook:
# Get the user name and password from :code:`pg_secret` import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to PostgreSQL def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table') # Get the user name and password from :code:`pg_secret`
Source tracing when using DB-API to connect to PostgreSQL¶
Include a tag of Snowpark in your create connection function.
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", # Include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection
Run the following SQL in your data source to capture queries from Snowpark that are still live:
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
MySQL¶
Um von Snowpark aus eine Verbindung mit MySQL herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open Source-Bibliothek pymysql: PyMySQL
Die folgenden Codebeispiele zeigen, wie Sie von einem Snowpark-Client, gespeicherten Prozeduren und einem Snowflake Notebook aus eine Verbindung zu MySQL herstellen.
Verwenden der DB-API, um eine Verbindung zu MySQL von einem Snowpark-Client aus herzustellen¶
Installieren Sie pymysql:
pip install snowflake-snowpark-python[pandas] pip install pymysql
Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu MySQL:
def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, database="mysql_db", user="mysql_user", password="mysql_password", # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Using DB-API to connect to MySQL from a stored procedure¶
Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; -- configure a network rule. CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Verwenden Sie die Snowpark Python DB-API, um Daten aus MySQL in einer gespeicherten Python-Prozedur abzurufen:
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pymysql') EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration) SECRETS = ('cred' = mysql_secret ) AS $$ # Get user name and password from mysql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to MySQL from snowflake.snowpark import session def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, dbname="mysql_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure. def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_mysql_dbapi();
Using DB-API to connect to MySQL from a Snowflake notebook¶
Wählen Sie unter :doc:` Snowflake Notebook-Pakete </user-guide/ui-snowsight/notebooks-import-packages> `
snowflake-snowpark-pythonundpymysqlaus.Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret); -- configure a network rule. CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an EAI CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
Use the DB-API to pull data from MySQL in a Python cell of a Snowflake notebook:
# Get user name and password from mysql_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to MySQL def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, dbname="mysql_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to MySQL¶
Include a tag of Snowpark in your create connection function.
def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, database="mysql_db", user="mysql_user", password="mysql_password", # include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection
Run the following SQL in your data source to capture queries from Snowpark:
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID FROM performance_schema.events_statements_history_long WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'" ORDER BY EVENT_ID DESC LIMIT 1 )
Databricks¶
Um eine Verbindung zu Databricks von Snowpark aus herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open-Source-Bibliothek psycopg2: databricks-sql-connector
The following code examples show how to connect to Databricks from a Snowpark client, stored procedures, and a Snowflake notebook.
Using DB-API to connect to Databricks from a Snowpark client¶
Install databricks-sql-connector:
pip install snowflake-snowpark-python[pandas] pip install databricks-sql-connector
Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu Databricks:
def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname=HOST, http_path=PATH, access_token=ACCESS_TOKEN ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Using DB-API to connect to Databricks from a stored procedure¶
Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Verwenden Sie die Snowpark Python DB-API, um Daten aus Databricks in einer gespeicherten Python-Prozedur abzurufen:
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector') EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration) SECRETS = ('cred' = dbx_secret ) AS $$ # Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') from snowflake.snowpark import Session # Define the method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure. def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_dbx_dbapi();
Using DB-API to connect to Databricks from a Snowflake notebook¶
From Snowflake Notebook packages, select
snowflake-snowpark-pythonanddatabricks-sql-connector.Konfigurieren Sie die Integration für den externen Zugriff (EAI), die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel, um ausgehenden Datenverkehr zum Quellendpunkt zuzulassen, und EAI:
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret); CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
Use the DB-API to pull data from Databricks in a Python cell of a Snowflake notebook:
# Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using DB-API to connect to Databricks¶
Include a tag of Snowpark in your create connection function.
def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname=HOST, http_path=PATH, access_token=ACCESS_TOKEN, # include this parameter for source tracing user_agent_entry="snowflake-snowpark-python" ) return connection
Navigate to query history on the DataBricks console and search for the query whose source is
snowflake-snowpark-python.
Einschränkungen¶
Die Snowpark Python-DB-API unterstützt nur Python DB-API-2.0-kompatible Treiber (z. B. pyodbc oder oracledb). JDBC-Treiber werden in diesem Release nicht unterstützt.