Verwenden der Snowpark Python-JDBC

Mit der Snowpark Python-JDBC können Snowpark Python-Benutzer programmgesteuert Daten aus externen Datenbanken in Snowflake abrufen. Auf diese Weise können Sie über JDBC-Treiber eine Verbindung zu externen Datenbanken herstellen.

Mit diesen APIs können Sie Daten nahtlos in Snowflake-Tabellen ziehen und mit Snowpark-DataFrames für erweiterte Analysen umwandeln.

`JDBC <https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc> `_ kann auf ähnliche Weise wie die Spark JDBC-API verwendet werden. Die meisten Parameter sind so konzipiert, dass sie für eine bessere Parität identisch oder ähnlich sind. Weitere Informationen zum Vergleich der Snowpark Python-JDBC mit der Spark-JDBC-API finden Sie in der folgenden Tabelle:

Snowpark-JDBC-Parameter

Parameter

Snowpark Python-JDBC

url

Eine Verbindungszeichenfolge, die verwendet wird, um über den JDBC-Treiber die Verbindung zur externen Datenquelle herzustellen

udtf_configs

Ein Dictionary mit den notwendigen Konfigurationen für die UDTF-Erstellung

properties

Ein Dictionary mit dem Schlüssel-Wert-Paar, das beim Einrichten der JDBC-Verbindung benötigt wird

table

Tabelle in der Quelldatenbank

query

SQL-Abfrage, die als Unterabfrage zum Lesen von Daten eingeschlossen ist

column

Partitionierungsspalte für parallele Lesevorgänge

lower_bound

Untere Grenze für die Partitionierung

upper_bound

Obere Grenze für die Partitionierung

num_partitions

Anzahl der Partitionen für Parallelität

query_timeout

Die Timeout-Dauer für die SQL-Ausführung, gemessen in Sekunden

fetch_size

Anzahl der Zeilen, die pro Roundtrip abgerufen wurden

custom_schema

Kundenspezifisches Schema zum Abrufen von Daten aus externen Datenbanken

predicates

Auflistung der Bedingungen für Partitionen mit WHERE-Klausel

session_init_statement

Führt eine SQL- oder PL/SQL-Anweisung bei der Initialisierung der Sitzung aus

Erläuterungen zur Parallelität

Snowpark Python-JDBC hat derzeit eine Form des zugrunde liegenden Datenaufnahmemechanismus:

UDTF-Datenaufnahme

Alle Workloads werden auf dem Snowflake-Server ausgeführt. Snowpark erstellt eine Java-UDTF und ruft sie parallel auf, um Daten in einer temporären Snowflake-Tabelle zu erfassen. Somit ist der Parameter udtf_configs für dieses Feature erforderlich.

Die Snowpark Python-JDBC bietet zwei Möglichkeiten, die Datenaufnahme zu parallelisieren und zu beschleunigen:

Partitionsspalte

Diese Methode unterteilt die Quelldaten anhand von vier Parametern in eine Anzahl von Partitionen, wenn Benutzende jdbc() aufrufen:

  • column

  • lower_bound

  • upper_bound

  • num_partitions

Diese vier Parameter müssen gleichzeitig und eingestellt werden. column muss numerisch oder vom Typ date sein.

Prädikate

Diese Methode unterteilt die Quelldaten in Partitionen auf der Grundlage von Parameterprädikaten, die eine Liste von Ausdrücken sind, die für die Aufnahme in WHERE-Klauseln geeignet sind, wobei jeder Ausdruck eine Partition definiert. Prädikate bieten eine flexiblere Möglichkeit zum Aufteilen von Partitionen. Sie können beispielsweise Partitionen auf booleschen oder nicht-numerischen Spalten dividieren.

Die Snowpark Python-JDBC ermöglicht auch die Anpassung des Parallelitätsgrads innerhalb einer Partition.

Fetch_size

Innerhalb einer Partition ruft die API Zeilen in Blöcken ab, die durch fetch_size definiert sind. Diese Zeilen werden beim Abrufen parallel in Snowflake geschrieben, sodass sich Lesen und Schreiben überschneiden und der Durchsatz maximiert werden kann.

Verwenden von JDBC zum Erfassen von Daten aus externen Datenquellen

Verwenden von JDBC zum Erfassen von Daten von einem Snowpark-Client

  1. Hochladen der JDBC-Treiber jar-Datei mit Snowpark oder Snowsight in einen Snowflake-Stagingbereich

    • Laden Sie mit Snowpark hoch.

      Führen Sie in Snowpark nach dem Erstellen einer Sitzung den folgenden Code aus:

      session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
      
      Copy
    • Laden Sie mit Snowsight hoch, wie in den folgenden Schritten beschrieben.

      1. Klicken Sie in Snowsight auf Catalog -> Database Explorer.

      2. Klicken Sie in der linken Suchleiste von Datenbanken auf [your database name] -> [your schema name] -> stages -> [your stage name].

      3. Klicken Sie auf die Schaltfläche “+File” oben rechts auf der Stagingbereichsseite.

  2. Konfigurieren Sie das Geheimnis, die Netzwerkregel und die Integration für den externen Zugriff.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Rufen sie Daten aus dem Ziel mit Snowpark JDBC von einem Snowpark-Client ab.

    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

Verwenden von JDBC zum Einlesen von Daten aus einer gespeicherten Prozedur

  1. Hochladen der JDBC-Treiber jar-Datei mit Snowsight in einen Snowflake-Stagingbereich

    • Klicken Sie in Snowsight auf Catalog -> Database Explorer

    • Klicken Sie in der linken Suchleiste von Datenbanken auf [your database name] -> [your schema name] -> stages -> [your stage name].

    • Klicken Sie auf die Schaltfläche “+File” oben rechts auf der Stagingbereichsseite.

  2. Konfigurieren Sie Geheimnis, Netzwerkregel und Integration für den externen Zugriff.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Ziehen Sie Daten aus dem Ziel mit Snowpark JDBC aus einer gespeicherten Prozedur.

    CREATE OR REPLACE PROCEDURE sp_jdbc()
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python')
    HANDLER = 'run'
    AS
    $$
    import time
    def run(session):
        connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
        udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
        # Call jdbc to pull data from target table
        df_table = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
            )
    
        # Call jdbc to pull data from target query
        df_query = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                query="select * from <your table>",
            )
    
        # Pull data from target table with parallelism using partition column
        df_table_partition_column = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                num_partitions=4,
                column="ID",
                upper_bound=10000,
                lower_bound=0
            )
    
        # Pull data from target table with parallelism using predicates
        df_table_predicates = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                predicates = [
                    "ID < 3",
                    "ID >= 3"
                ]
            )
        df_table.write.save_as_table("snowflake_table", mode="overwrite")
        return f"success"
    
    $$
    ;
    
    call sp_jdbc();
    select * from snowflake_table ;
    
    Copy

Verwenden von JDBC zum Erfassen von Daten von einem Snowflake Notebook

  1. Hochladen der JDBC-Treiber jar-Datei mit Snowsight in einen Snowflake-Stagingbereich

    • Klicken Sie in Snowsight auf Catalog -> Database Explorer

    • Klicken Sie in der linken Suchleiste von Datenbanken auf [your database name] -> [your schema name] -> stages -> [your stage name].

    • Klicken Sie auf die Schaltfläche “+File” oben rechts auf der Stagingbereichsseite.

  2. Konfigurieren Sie Geheimnis, Netzwerkregel und Integration für den externen Zugriff.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Ziehen Sie Daten aus dem Ziel mit Snowpark JDBC aus einem Snowflake Notebook.

    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

Quellenablaufverfolgung

Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit MySQL

  1. Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
      SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%';
    )
    
    Copy

Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit SQL Server

  1. Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:

    connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
          url=connection_str,
          udtf_configs=udtf_configs,
          table="<your table>",
      )
    
    Copy
  2. Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:

    SELECT
      s.session_id,
      s.program_name,
      r.status,
      t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit PostgresSQL

  1. Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:

    connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:

    SELECT
      pid,
      usename AS username,
      datname AS database,
      application_name,
      client_addr,
      state,
      query_start,
      query
    FROM
      pg_stat_activity
    WHERE
      application_name = 'snowflake-snowpark-python';
    
    Copy

Quellenablaufverfolgung bei Verwendung von Snowpark JDBC zum Verbinden mit Oracle

  1. Fügen Sie ein Snowpark-Tag in Ihre Funktion zum Erstellen einer Verbindung ein:

    connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Führen Sie folgenden SQL-Code in Ihrer Datenquelle aus, um Abfragen aus Snowpark zu erfassen, die noch aktiv sind:

    SELECT
      sid,
      serial#,
      username,
      program,
      module,
      action,
      client_identifier,
      client_info,
      osuser,
      machine
    FROM v$session
    WHERE program = 'snowflake-snowpark-python';
    
    Copy

Gemeinsame DBMS Unterstützung von Typen

Im Folgenden finden Sie eine geprüfte Liste von Datentypen verschiedener DBMS-Systeme. Wenn Ihre Quelldaten andere Datentypen enthalten, versucht Snowpark Python JDBC, sie den bestmöglichen Snowflake-Datentypen zuzuordnen oder greift auf Zeichenfolgen zurück.

Oracle

  • INTEGER

  • NUMBER

  • BINARY_FLOAT

  • BINARY_DOUBLE

  • VARCHAR2

  • CHAR

  • CLOB

  • NCHAR

  • NVARCHAR2

  • NCLOB

  • DATE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • RAW

PostgresSQL

  • BIGINT

  • BIGSERIAL

  • BIT

  • BIT-VARYING

  • BOOLEAN

  • BOX

  • BYTEA

  • CHAR

  • VARCHAR

  • CIDR

  • CIRCLE

  • DATE

  • DOUBLE PRECISION

  • INET

  • INTEGER

  • INTERVAL

  • JSON

  • JSONB

  • LINE

  • LSEG

  • MACADDR

  • POINT

  • POLYGON

  • REAL

  • SMALLINT

  • SMALLSERIAL

  • SERIAL

  • TEXT

  • TIME

  • TIMESTAMP

  • TIMESTAMPTZ

  • TSQUERY

  • TSVECTOR

  • TXID_SNAPSHOT

  • UUID

  • XML

MySQL

  • INT

  • DECIMAL

  • INT

  • TINYINT

  • SMALLINT

  • MEDIUMINT

  • BIGINT

  • YEAR

  • FLOAT

  • DOUBLE

  • CHAR

  • VARCHAR

  • TINYTEXT

  • TEXT

  • MEDIUMTEXT

  • LONGTEXT

  • ENUM

  • SET

  • BIT

  • BINARY

  • VARBINARY

  • TINYBLOB

  • BLOB

  • MEDIUMBLOB

  • LONGBLOB

  • DATE

  • DATETIME

  • TIMESTAMP

  • TIME

  • JSON

SQL Server

  • INT

  • BIGINT

  • INT

  • SMALLINT

  • TINYINT

  • BIT

  • DECIMAL

  • NUMERIC

  • MONEY

  • SMALLMONEY

  • FLOAT

  • REAL

  • DATE

  • TIME

  • DATETIME

  • DATETIME2

  • SMALLDATETIME

  • CHAR

  • VARCHAR

  • VARCHAR(MAX)

  • TEXT

  • NCHAR

  • NVARCHAR

  • NVARCHAR(MAX)

  • NTEXT

  • BINARY

  • VARBINARY

  • VARBINARY(MAX)

  • IMAGE

  • UNIQUEIDENTIFIER

  • TIMESTAMP

Databricks

Verbinden mit Databricks über Snowpark Python JDBC wird derzeit nicht unterstützt.