Lesen von Daten aus externen Datenquellen mit Snowpark Python DB-API¶
Mit Snowpark Python DB-API können Snowpark Python-Benutzer programmgesteuert Daten aus externen Datenbanken in Snowflake abrufen. Der Abschnitt umfasst folgende Themen:
Python- DB-API-Unterstützung: Stellen Sie eine Verbindung mit externen Datenbanken über Standard-DB-API 2.0-Treiber von Python her.
Optimierte Einrichtung: Verwenden Sie
pip
um die erforderlichen Treiber zu installieren, ohne zusätzliche Abhängigkeiten verwalten zu müssen.
Mit diesen APIs können Sie Daten nahtlos in Snowflake-Tabellen ziehen und mit Snowpark-DataFrames für erweiterte Analysen umwandeln.
Verwenden von Snowpark Python DB-API¶
Die DB-API kann auf ähnliche Weise wie die Spark JDBC-API verwendet werden. Die meisten Parameter sind so konzipiert, dass sie für eine bessere Parität identisch oder ähnlich sind. Gleichzeitig legt Snowpark Wert auf ein Python-First-Design mit intuitiven Namenskonventionen, damit JDBC-spezifische Konfigurationen vermieden werden. Dies bietet Python-Entwicklern eine vertraute Erfahrung. Weitere Informationen über Vergleiche zwischen Snowpark Python DB-API und der Spark JDBC-API, siehe DB-API-Parameter.
DB-API-Parameter¶
Parameter |
Snowpark DB-API |
---|---|
|
Funktion zum Erstellen einer Python DB-API-Verbindung |
|
Gibt die Tabelle in der Quelldatenbank an. |
|
SQL-Abfrage, die als Unterabfrage zum Lesen von Daten eingeschlossen ist. |
|
Partitionierungsspalte für parallele Lesevorgänge. |
|
Untere Grenze für die Partitionierung. |
|
Obere Grenze für die Partitionierung. |
|
Anzahl der Partitionen für Parallelität. |
|
Timeout für SQL-Ausführung (in Sekunden). |
|
Anzahl der Zeilen, die pro Roundtrip abgerufen wurden. |
|
Kundenspezifisches Schema zum Abrufen von Daten aus externen Datenbanken. |
|
Anzahl der Worker für das parallele Abrufen von Daten aus externen Datenbanken. |
|
Auflistung der Bedingungen für Partitionen mit WHERE-Klausel. |
|
Führt eine SQL- oder PL/SQL-Anweisung bei der Initialisierung der Sitzung aus. |
|
Führen Sie den Workload mit einer Snowflake-UDTF-Datei aus, um eine bessere Leistung zu erzielen. |
|
Anzahl der abgerufenen Batches, die vor dem Hochladen in eine einzige Parquet-Datei zusammengeführt werden sollen. |
Erläuterungen zur Parallelität¶
Snowpark Python DB-API verwendet zwei unabhängige Formen der Parallelität, die auf Benutzereingaben basieren:
Partitionsbasierte Parallelität
Wenn Benutzer Partitionierungsinformationen (z. B.
column
,lower_bound
,upper_bound
,num_partitions
) oder Prädikate angeben, teilt Snowflake die Abfrage in mehrere Partitionen auf. Diese werden parallel durch Multiprocessing verarbeitet, wobei jeder Worker seine eigene Partition abruft und schreibt.Abrufgrößenbasierte Parallelität innerhalb jeder Partition
Innerhalb einer Partition ruft die API Zeilen in Blöcken ab, die durch
fetch_size
definiert sind. Diese Zeilen werden beim Abrufen parallel in Snowflake geschrieben, sodass sich Lesen und Schreiben überschneiden und der Durchsatz maximiert werden kann.
Diese beiden Formen der Parallelität funktionieren unabhängig voneinander. Wenn weder Partitionierung noch fetch_size
angegeben ist, lädt die Funktion vor dem Schreibvorgang in Snowflake die gesamte Quelltabelle in den Arbeitsspeicher. Dies kann den Verbrauch an Arbeitsspeicher erhöhen und die Leistung bei großen Datensets verringern.
SQL Server¶
Verwenden von DB-API zur Herstellung einer Verbindung zu SQL Server von einem Snowpark-Client aus¶
Um von Snowpark aus eine Verbindung zu SQL Server herzustellen, benötigen Sie die folgenden drei Pakete:
Snowpark: snowflake-snowpark-python[pandas]
SQL Server-ODBC-Treiber: Microsoft-ODBC-Treiber für SQL Server. Durch die Installation des Treibers erklären Sie sich mit der EULA von Microsoft einverstanden.
Die Open Source-Bibliothek pyodbc: pyodbc
Nachfolgend finden Sie die Codebeispiele für die Verbindung von einem Snowpark-Client sowie einer gespeicherten Prozedur aus zu SQL Server herzustellen.
Installieren des Python-SQL-Treibers
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)" brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Installieren Sie
snowflake-snowpark-python[pandas]
undpyodbc
pip install snowflake-snowpark-python[pandas] pip install pyodbc
Definieren Sie die Factory-Methode zum Herstellen der Verbindung zu SQL Server
def create_sql_server_connection(): import pyodbc HOST = "mssql_host" PORT = "mssql_port" USERNAME = "mssql_username" PASSWORD = "mssql_password" DATABASE = "mssql_db" connection_str = ( "DRIVER={{ODBC Driver 18 for SQL Server}};" "SERVER={HOST},{PORT};" "DATABASE={DATABASE};" "UID={USERNAME};" "PWD={PASSWORD};" ) connection = pyodbc.connect(connection_str) return connection # Call dbapi to pull data from mssql_table df = session.read.dbapi( create_sql_server_connection, table="mssql_table")
Verwenden von DB-API zur Herstellung einer Verbindung zu SQL Server von einer gespeicherten Prozedur aus¶
Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true; -- Create or replace a Python stored procedure CREATE OR REPLACE PROCEDURE sp_mssql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql') EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration) SECRETS = ('cred' = mssql_secret ) AS $$ # Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define a method to connect to SQL server_hostname from snowflake.snowpark import Session def create_sql_server_connection(): import pyodbc host = "mssql_host" port = mssql_port username = USER password = PASSWORD database = "mssql_database" connection_str = ( "DRIVER={{ODBC Driver 18 for SQL Server}};" "SERVER={host},{port};" "DATABASE={database};" "UID={username};" "PWD={password};" ) connection = pyodbc.connect(connection_str) return connection def run(session: Session): df = session.read.dbapi( create_sql_server_connection, table="mssql_table" ) return df $$; CALL sp_mssql_dbapi();
Oracle¶
Um von Snowpark aus eine Verbindung zu Oracle herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open Source-Bibliothek oracledb: oracledb
Im Folgenden finden Sie die Codebeispiele zur Herstellung einer Verbindung von einem Snowpark-Client zu Oracle, gespeicherte Prozeduren und Snowflake Notebooks.
Verwenden von DB-API zur Verbindung zu Oracle von einem Snowpark-Client aus¶
Installieren Sie
snowflake-snowpark-python[pandas]
undoracledb
pip install snowflake-snowpark-python[pandas] pip install oradb
Verwenden Sie DB-API, um Daten aus Oracle abzurufen, und definieren Sie die Factory-Methode zum Erstellen einer Verbindung zu Oracle
def create_oracle_db_connection(): import oracledb HOST = "myhost" PORT = "myport" SERVICE_NAME = "myservice" USER = "myuser" PASSWORD = "mypassword" DSN = "{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) return connection # Call dbapi to pull data from mytable df = session.read.dbapi( create_oracle_db_connection, table="mytable")
Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu Oracle herzustellen¶
Die Integration für den externen Zugriff ist erforderlich, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
-- Configure the secret, a network rule to allow egress to the source endpoint and external access integration. CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Verwenden von Snowpark Python DB-API, um Daten aus Oracle in eine gespeicherte Python-Prozedur zu ziehen¶
CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'oracledb')
EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
SECRETS = ('cred' = ora_secret )
AS $$
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to Oracle
from snowflake.snowpark import Session
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = "{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_ora_connection,
table="ora_table"
)
return df
$$;
CALL sp_ora_dbapi();
Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu Oracle herzustellen¶
Wählen Sie
snowflake-snowpark-python
undoracledb
aus Notebook-Paketen aus.Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret); -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- Configure an external access integration. CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
Verwenden von Snowpark Python DB-API, um Daten aus Oracle in eine Python-Zelle von Snowflake Notebook zu ziehen¶
# Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Oracle def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = "{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) return connection # Use dbapi to read data from ora_table df_ora = session.read.dbapi( create_oracle_db_connection, table='ora_table' ) # Save data into sf_table df_ora.write.mode("overwrite").save_as_table('sf_table')
PostgreSQL¶
Um von Snowpark aus eine Verbindung mit PostgreSQL herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open-Source-Bibliothek psycopg2: psycopg2
Nachfolgend finden Sie die Codebeispiele, die für die Verbindung mit PostgreSQL von einem Snowpark-Client aus benötigt werden, sowie gespeicherten Prozeduren und Snowflake Notebooks
Verwenden von DB-API, um eine Verbindung zu PostgreSQL von einem Snowpark-Client aus herzustellen¶
psycopg2
installierenpip install psycopg2
Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu PostgreSQL
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", ) return connection # Call dbapi to pull data from pg_table df = session.read.dbapi( create_pg_connection, table="pg_table")
Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu PostgreSQL herzustellen¶
Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
-- Configure a network rule.
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure an external access integration.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Verwenden Sie Snowpark Python DB-API, um Daten in einer gespeicherten Python-Prozedur aus PostgreSQL abzurufen
CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$
# Get user name and password from pg_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to PostgreSQL
from snowflake.snowpark import Session
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_pg_connection,
table="pg_table"
)
return df
$$;
CALL sp_pg_dbapi();
Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu PostgreSQL herzustellen¶
Wählen Sie
snowflake-snowpark-python
undpsycopg2
aus Snowflake Notebook-Pakete aus.Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
-- Configure the secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
-- Configure the network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
# Get the user name and password from :code:`pg_secret` import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to PostgreSQL def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, ) return connection # Use dbapi to read and save data from pg_table df = session.read.dbapi( create_pg_connection, table="pg_table" ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
MySQL¶
Um von Snowpark aus eine Verbindung mit MySQL herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open Source-Bibliothek pymysql: PyMySQL
Nachfolgend finden Sie die Codebeispiele, die für die Verbindung mit MySQL vom Snowpark-Client aus benötigt werden, sowie gespeicherte Prozeduren und Snowflake Notebook.
Verwenden von DB-API, um eine Verbindung zu MySQL von einem Snowpark-Client aus herzustellen¶
Installieren Sie pymysql
pip install snowflake-snowpark-python[pandas] pip install pymysql
Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu MySQL
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password"
)
return connection
# Call dbapi to pull data from mysql_table
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table"
)
Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu MySQL herzustellen¶
Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
-- Configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Verwenden Sie Snowpark Python DB-API, um Daten aus MySQL in einer gespeicherten Python-Prozedur abzurufen,
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to MySQL
from snowflake.snowpark import session
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table"
)
return df
$$;
CALL sp_mysql_dbapi();
Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu MySQL herzustellen¶
Wählen Sie
snowflake-snowpark-python
undpymysql
aus den Snowflake Notebook-Paketen aus.Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis und fügen Sie es dem Snowflake-Notizbuch hinzu.
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Konfigurieren Sie eine Netzwerkregel und eine Integration für den externen Zugriff.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to MySQL
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
)
return connection
# Call dbapi to pull data from mysql_table
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table")
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Databricks¶
Um eine Verbindung zu Databricks von Snowpark aus herzustellen, benötigen Sie die folgenden beiden Pakete:
Snowpark: snowflake-snowpark-python[pandas]
Die Open-Source-Bibliothek psycopg2: databricks-sql-connector
Im Folgenden finden Sie die Codebeispiele, die benötigt werden, um vom Snowpark-Client, gespeicherten Prozeduren und Snowflake Notebook aus eine Verbindung zu Databricks herzustellen.
Verwenden von DB-API, um von einem Snowpark-Client aus eine Verbindung zu Databricks herzustellen¶
Installieren Sie
databricks-sql-connector
:
pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu Databricks
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN
)
return connection
#Call dbapi to pull data from mytable
df = session.read.dbapi(
create_dbx_connection,
table="dbx_table")
Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu Databricks herzustellen¶
Wählen Sie
snowflake-snowpark-python
undpymysql
aus Snowflake Notebook-Pakete aus.Die Integration für den externen Zugriff ist erforderlich, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
-- Configure a network rule
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
Verwenden Sie Snowpark Python DB-API, um Daten aus Databricks in einer gespeicherten Python-Prozedur abzurufen
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector') EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration) SECRETS = ('cred' = dbx_secret ) AS $$ # Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') from snowflake.snowpark import Session # define the method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection def run(session: Session): df = session.read.dbapi( create_dbx_connection, table="dbx_table" ) return df $$; CALL sp_dbx_dbapi();
Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu Databricks herzustellen¶
Wählen Sie
snowflake-snowpark-python
undpymysql
aus Snowflake Notebook-Pakete aus.Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.
Bemerkung
PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.
Konfigurieren Sie das Geheimnis und fügen Sie es dem Snowflake-Notizbuch hinzu.
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
Konfigurieren
CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.
# Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # use dbapi to read data from dbx_table df = session.read.dbapi( create_dbx_connection, table="dbx_table" ) # save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Einschränkungen¶
Treiber¶
Snowpark Python DB-API unterstützt nur Python DB-API-2.0-kompatible Treiber (z. B. pyodbc
, oracledb
). JDBC-Treiber werden in diesem Release nicht unterstützt.