Verwenden der Snowpark Python-JDBC¶
Mit der Snowpark Python-JDBC können Snowpark Python-Benutzer programmgesteuert Daten aus externen Datenbanken in Snowflake abrufen. Auf diese Weise können Sie über JDBC-Treiber eine Verbindung zu externen Datenbanken herstellen.
Mit diesen APIs können Sie Daten nahtlos in Snowflake-Tabellen ziehen und mit Snowpark-DataFrames für erweiterte Analysen umwandeln.
`JDBC <https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc> `_ kann auf ähnliche Weise wie die Spark JDBC-API verwendet werden. Die meisten Parameter sind so konzipiert, dass sie für eine bessere Parität identisch oder ähnlich sind. Weitere Informationen zum Vergleich der Snowpark Python-JDBC mit der Spark-JDBC-API finden Sie in der folgenden Tabelle:
Snowpark-JDBC-Parameter¶
Parameter |
Snowpark Python-JDBC |
|---|---|
|
Eine Verbindungszeichenfolge, die verwendet wird, um über den JDBC-Treiber die Verbindung zur externen Datenquelle herzustellen |
|
Ein Dictionary mit den notwendigen Konfigurationen für die UDTF-Erstellung |
|
Ein Dictionary mit dem Schlüssel-Wert-Paar, das beim Einrichten der JDBC-Verbindung benötigt wird |
|
Tabelle in der Quelldatenbank |
|
SQL-Abfrage, die als Unterabfrage zum Lesen von Daten eingeschlossen ist |
|
Partitionierungsspalte für parallele Lesevorgänge |
|
Untere Grenze für die Partitionierung |
|
Obere Grenze für die Partitionierung |
|
Anzahl der Partitionen für Parallelität |
|
Die Timeout-Dauer für die SQL-Ausführung, gemessen in Sekunden |
|
Anzahl der Zeilen, die pro Roundtrip abgerufen wurden |
|
Kundenspezifisches Schema zum Abrufen von Daten aus externen Datenbanken |
|
Auflistung der Bedingungen für Partitionen mit WHERE-Klausel |
|
Führt eine SQL- oder PL/SQL-Anweisung bei der Initialisierung der Sitzung aus |
Erläuterungen zur Parallelität¶
Snowpark Python-JDBC hat derzeit eine Form des zugrunde liegenden Datenaufnahmemechanismus:
- UDTF-Datenaufnahme
Alle Workloads werden auf dem Snowflake-Server ausgeführt. Snowpark erstellt eine Java-UDTF und ruft sie parallel auf, um Daten in einer temporären Snowflake-Tabelle zu erfassen. Somit ist der Parameter
udtf_configsfür dieses Feature erforderlich.
Die Snowpark Python-JDBC bietet zwei Möglichkeiten, die Datenaufnahme zu parallelisieren und zu beschleunigen:
- Partitionsspalte
Diese Methode unterteilt die Quelldaten anhand von vier Parametern in eine Anzahl von Partitionen, wenn Benutzende
jdbc()aufrufen:columnlower_boundupper_boundnum_partitions
Diese vier Parameter müssen gleichzeitig und eingestellt werden.
columnmuss numerisch oder vom Typ date sein.- Prädikate
Diese Methode unterteilt die Quelldaten in Partitionen auf der Grundlage von Parameterprädikaten, die eine Liste von Ausdrücken sind, die für die Aufnahme in
WHERE-Klauseln geeignet sind, wobei jeder Ausdruck eine Partition definiert. Prädikate bieten eine flexiblere Möglichkeit zum Aufteilen von Partitionen. Sie können beispielsweise Partitionen auf booleschen oder nicht-numerischen Spalten dividieren.
Die Snowpark Python-JDBC ermöglicht auch die Anpassung des Parallelitätsgrads innerhalb einer Partition.
- Fetch_size
Innerhalb einer Partition ruft die API Zeilen in Blöcken ab, die durch
fetch_sizedefiniert sind. Diese Zeilen werden beim Abrufen parallel in Snowflake geschrieben, sodass sich Lesen und Schreiben überschneiden und der Durchsatz maximiert werden kann.
Verwenden von JDBC zum Erfassen von Daten aus externen Datenquellen¶
Verwenden von JDBC zum Erfassen von Daten von einem Snowpark-Client¶
Hochladen der JDBC-Treiber jar-Datei mit Snowpark oder Snowsight in einen Snowflake-Stagingbereich
Laden Sie mit Snowpark hoch.
Führen Sie in Snowpark nach dem Erstellen einer Sitzung den folgenden Code aus:
session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
Laden Sie mit Snowsight hoch, wie in den folgenden Schritten beschrieben.
Klicken Sie in Snowsight auf Catalog -> Database Explorer.
Klicken Sie in der linken Suchleiste von Datenbanken auf [your database name] -> [your schema name] -> stages -> [your stage name].
Klicken Sie auf die Schaltfläche “+File” oben rechts auf der Stagingbereichsseite.
Konfigurieren Sie das Geheimnis, die Netzwerkregel und die Integration für den externen Zugriff.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Rufen sie Daten aus dem Ziel mit Snowpark JDBC von einem Snowpark-Client ab.
connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
Verwenden von JDBC zum Einlesen von Daten aus einer gespeicherten Prozedur¶
Hochladen der JDBC-Treiber jar-Datei mit Snowsight in einen Snowflake-Stagingbereich
Klicken Sie in Snowsight auf Catalog -> Database Explorer
Klicken Sie in der linken Suchleiste von Datenbanken auf [your database name] -> [your schema name] -> stages -> [your stage name].
Klicken Sie auf die Schaltfläche “+File” oben rechts auf der Stagingbereichsseite.
Konfigurieren Sie Geheimnis, Netzwerkregel und Integration für den externen Zugriff.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Ziehen Sie Daten aus dem Ziel mit Snowpark JDBC aus einer gespeicherten Prozedur.
CREATE OR REPLACE PROCEDURE sp_jdbc() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.10' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'run' AS $$ import time def run(session): connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] ) df_table.write.save_as_table("snowflake_table", mode="overwrite") return f"success" $$ ; call sp_jdbc(); select * from snowflake_table ;
Verwenden von JDBC zum Erfassen von Daten von einem Snowflake Notebook¶
Hochladen der JDBC-Treiber jar-Datei mit Snowsight in einen Snowflake-Stagingbereich
Klicken Sie in Snowsight auf Catalog -> Database Explorer
Klicken Sie in der linken Suchleiste von Datenbanken auf [your database name] -> [your schema name] -> stages -> [your stage name].
Klicken Sie auf die Schaltfläche “+File” oben rechts auf der Stagingbereichsseite.
Konfigurieren Sie Geheimnis, Netzwerkregel und Integration für den externen Zugriff.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Ziehen Sie Daten aus dem Ziel mit Snowpark JDBC aus einem Snowflake Notebook.
import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
Quellenablaufverfolgung¶
Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit MySQL¶
Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%'; )
Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit SQL Server¶
Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:
connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit PostgresSQL¶
Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:
connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit Oracle¶
Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:
connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:
SELECT sid, serial#, username, program, module, action, client_identifier, client_info, osuser, machine FROM v$session WHERE program = 'snowflake-snowpark-python';
Gemeinsame DBMS Unterstützung von Typen¶
Im Folgenden finden Sie eine geprüfte Liste von Datentypen verschiedener DBMS-Systeme. Wenn Ihre Quelldaten andere Datentypen enthalten, versucht Snowpark Python JDBC, sie den bestmöglichen Snowflake-Datentypen zuzuordnen oder greift auf Zeichenfolgen zurück.
Oracle¶
INTEGER
NUMBER
BINARY_FLOAT
BINARY_DOUBLE
VARCHAR2
CHAR
CLOB
NCHAR
NVARCHAR2
NCLOB
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
RAW
PostgresSQL¶
BIGINT
BIGSERIAL
BIT
BIT-VARYING
BOOLEAN
BOX
BYTEA
CHAR
VARCHAR
CIDR
CIRCLE
DATE
DOUBLE PRECISION
INET
INTEGER
INTERVAL
JSON
JSONB
LINE
LSEG
MACADDR
POINT
POLYGON
REAL
SMALLINT
SMALLSERIAL
SERIAL
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TSQUERY
TSVECTOR
TXID_SNAPSHOT
UUID
XML
MySQL¶
INT
DECIMAL
INT
TINYINT
SMALLINT
MEDIUMINT
BIGINT
YEAR
FLOAT
DOUBLE
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
DATE
DATETIME
TIMESTAMP
TIME
JSON
SQL Server¶
INT
BIGINT
INT
SMALLINT
TINYINT
BIT
DECIMAL
NUMERIC
MONEY
SMALLMONEY
FLOAT
REAL
DATE
TIME
DATETIME
DATETIME2
SMALLDATETIME
CHAR
VARCHAR
VARCHAR(MAX)
TEXT
NCHAR
NVARCHAR
NVARCHAR(MAX)
NTEXT
BINARY
VARBINARY
VARBINARY(MAX)
IMAGE
UNIQUEIDENTIFIER
TIMESTAMP
Databricks¶
Verbinden mit Databricks über Snowpark Python JDBC wird derzeit nicht unterstützt.