Lesen von Daten aus externen Datenquellen mit Snowpark Python DB-API

Mit Snowpark Python DB-API können Snowpark Python-Benutzer programmgesteuert Daten aus externen Datenbanken in Snowflake abrufen. Der Abschnitt umfasst folgende Themen:

  • Python- DB-API-Unterstützung: Stellen Sie eine Verbindung mit externen Datenbanken über Standard-DB-API 2.0-Treiber von Python her.

  • Optimierte Einrichtung: Verwenden Sie pip um die erforderlichen Treiber zu installieren, ohne zusätzliche Abhängigkeiten verwalten zu müssen.

Mit diesen APIs können Sie Daten nahtlos in Snowflake-Tabellen ziehen und mit Snowpark-DataFrames für erweiterte Analysen umwandeln.

Verwenden von Snowpark Python DB-API

Die DB-API kann auf ähnliche Weise wie die Spark JDBC-API verwendet werden. Die meisten Parameter sind so konzipiert, dass sie für eine bessere Parität identisch oder ähnlich sind. Gleichzeitig legt Snowpark Wert auf ein Python-First-Design mit intuitiven Namenskonventionen, damit JDBC-spezifische Konfigurationen vermieden werden. Dies bietet Python-Entwicklern eine vertraute Erfahrung. Weitere Informationen über Vergleiche zwischen Snowpark Python DB-API und der Spark JDBC-API, siehe DB-API-Parameter.

DB-API-Parameter

Parameter

Snowpark DB-API

create_connection

Funktion zum Erstellen einer Python DB-API-Verbindung

table

Gibt die Tabelle in der Quelldatenbank an.

query

SQL-Abfrage, die als Unterabfrage zum Lesen von Daten eingeschlossen ist.

column

Partitionierungsspalte für parallele Lesevorgänge.

lower_bound

Untere Grenze für die Partitionierung.

upper_bound

Obere Grenze für die Partitionierung.

num_partitions

Anzahl der Partitionen für Parallelität.

query_timeout

Timeout für SQL-Ausführung (in Sekunden).

fetch_size

Anzahl der Zeilen, die pro Roundtrip abgerufen wurden.

custom_schema

Kundenspezifisches Schema zum Abrufen von Daten aus externen Datenbanken.

max_workers

Anzahl der Worker für das parallele Abrufen von Daten aus externen Datenbanken.

predicates

Auflistung der Bedingungen für Partitionen mit WHERE-Klausel.

session_init_statement

Führt eine SQL- oder PL/SQL-Anweisung bei der Initialisierung der Sitzung aus.

udtf_configs

Führen Sie den Workload mit einer Snowflake-UDTF-Datei aus, um eine bessere Leistung zu erzielen.

fetch_merge_count

Anzahl der abgerufenen Batches, die vor dem Hochladen in eine einzige Parquet-Datei zusammengeführt werden sollen.

Erläuterungen zur Parallelität

Snowpark Python DB-API verwendet zwei unabhängige Formen der Parallelität, die auf Benutzereingaben basieren:

  • Partitionsbasierte Parallelität

    Wenn Benutzer Partitionierungsinformationen (z. B. column, lower_bound, upper_bound, num_partitions) oder Prädikate angeben, teilt Snowflake die Abfrage in mehrere Partitionen auf. Diese werden parallel durch Multiprocessing verarbeitet, wobei jeder Worker seine eigene Partition abruft und schreibt.

  • Abrufgrößenbasierte Parallelität innerhalb jeder Partition

    Innerhalb einer Partition ruft die API Zeilen in Blöcken ab, die durch fetch_size definiert sind. Diese Zeilen werden beim Abrufen parallel in Snowflake geschrieben, sodass sich Lesen und Schreiben überschneiden und der Durchsatz maximiert werden kann.

Diese beiden Formen der Parallelität funktionieren unabhängig voneinander. Wenn weder Partitionierung noch fetch_size angegeben ist, lädt die Funktion vor dem Schreibvorgang in Snowflake die gesamte Quelltabelle in den Arbeitsspeicher. Dies kann den Verbrauch an Arbeitsspeicher erhöhen und die Leistung bei großen Datensets verringern.

SQL Server

Verwenden von DB-API zur Herstellung einer Verbindung zu SQL Server von einem Snowpark-Client aus

Um von Snowpark aus eine Verbindung zu SQL Server herzustellen, benötigen Sie die folgenden drei Pakete:

Nachfolgend finden Sie die Codebeispiele für die Verbindung von einem Snowpark-Client sowie einer gespeicherten Prozedur aus zu SQL Server herzustellen.

  • Installieren des Python-SQL-Treibers

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
  brew update
  HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Copy
  • Installieren Sie snowflake-snowpark-python[pandas] und pyodbc

pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Copy
  • Definieren Sie die Factory-Methode zum Herstellen der Verbindung zu SQL Server

def create_sql_server_connection():
    import pyodbc
    HOST = "mssql_host"
    PORT = "mssql_port"
    USERNAME = "mssql_username"
    PASSWORD = "mssql_password"
    DATABASE = "mssql_db"
    connection_str = (
        "DRIVER={{ODBC Driver 18 for SQL Server}};"
        "SERVER={HOST},{PORT};"
        "DATABASE={DATABASE};"
        "UID={USERNAME};"
        "PWD={PASSWORD};"
    )
    connection = pyodbc.connect(connection_str)
    return connection


# Call dbapi to pull data from mssql_table

df = session.read.dbapi(
  create_sql_server_connection,
    table="mssql_table")
Copy

Verwenden von DB-API zur Herstellung einer Verbindung zu SQL Server von einer gespeicherten Prozedur aus

  • Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

    CREATE OR REPLACE SECRET mssql_secret
      TYPE = PASSWORD
      USERNAME = 'mssql_username'
      PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
      ALLOWED_NETWORK_RULES = (mssql_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
      ENABLED = true;
    
    -- Create or replace a Python stored procedure
    
    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
      RETURNS TABLE()
      LANGUAGE PYTHON
      RUNTIME_VERSION='3.11'
      HANDLER='run'
      PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
            EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
            SECRETS = ('cred' = mssql_secret )
    
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "mssql_host"
        port = mssql_port
        username = USER
        password = PASSWORD
        database = "mssql_database"
        connection_str = (
          "DRIVER={{ODBC Driver 18 for SQL Server}};"
          "SERVER={host},{port};"
          "DATABASE={database};"
          "UID={username};"
          "PWD={password};"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        df = session.read.dbapi(
            create_sql_server_connection,
            table="mssql_table"
        )
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Oracle

Um von Snowpark aus eine Verbindung zu Oracle herzustellen, benötigen Sie die folgenden beiden Pakete:

Im Folgenden finden Sie die Codebeispiele zur Herstellung einer Verbindung von einem Snowpark-Client zu Oracle, gespeicherte Prozeduren und Snowflake Notebooks.

Verwenden von DB-API zur Verbindung zu Oracle von einem Snowpark-Client aus

  • Installieren Sie snowflake-snowpark-python[pandas] und oracledb

    pip install snowflake-snowpark-python[pandas]
    pip install oradb
    
    Copy
  • Verwenden Sie DB-API, um Daten aus Oracle abzurufen, und definieren Sie die Factory-Methode zum Erstellen einer Verbindung zu Oracle

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = "{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        return connection
    
    
    # Call dbapi to pull data from mytable
    
    df = session.read.dbapi(
      create_oracle_db_connection,
      table="mytable")
    
    Copy

Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu Oracle herzustellen

Die Integration für den externen Zugriff ist erforderlich, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

Bemerkung

PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

    -- Configure the secret, a network rule to allow egress to the source endpoint and external access integration.
    
    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy

Verwenden von Snowpark Python DB-API, um Daten aus Oracle in eine gespeicherte Python-Prozedur zu ziehen

CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'oracledb')
  EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
  SECRETS = ('cred' = ora_secret )
AS $$

# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to Oracle

from snowflake.snowpark import Session

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_ora_connection,
        table="ora_table"
    )
    return df
$$;

CALL sp_ora_dbapi();
Copy

Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu Oracle herzustellen

  • Wählen Sie snowflake-snowpark-python und oracledb aus Notebook-Paketen aus.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy
  • Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.

Verwenden von Snowpark Python DB-API, um Daten aus Oracle in eine Python-Zelle von Snowflake Notebook zu ziehen

# Get user name and password from ora_secret

import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to Oracle

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

# Use dbapi to read data from ora_table

df_ora = session.read.dbapi(
  create_oracle_db_connection,
  table='ora_table'
)

# Save data into sf_table

df_ora.write.mode("overwrite").save_as_table('sf_table')
Copy

PostgreSQL

Um von Snowpark aus eine Verbindung mit PostgreSQL herzustellen, benötigen Sie die folgenden beiden Pakete:

Nachfolgend finden Sie die Codebeispiele, die für die Verbindung mit PostgreSQL von einem Snowpark-Client aus benötigt werden, sowie gespeicherten Prozeduren und Snowflake Notebooks

Verwenden von DB-API, um eine Verbindung zu PostgreSQL von einem Snowpark-Client aus herzustellen

  • psycopg2 installieren

    pip install psycopg2
    
    Copy
  • Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu PostgreSQL

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
        )
        return connection
    
    
    # Call dbapi to pull data from pg_table
    
    df = session.read.dbapi(
      create_pg_connection,
      table="pg_table")
    
    Copy

Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu PostgreSQL herzustellen

  • Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE pg_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pg_host:pg_port');

-- Configure an external access integration.

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
  ALLOWED_NETWORK_RULES = (pg_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
  ENABLED = true;
Copy
  • Verwenden Sie Snowpark Python DB-API, um Daten in einer gespeicherten Python-Prozedur aus PostgreSQL abzurufen

CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$

# Get user name and password from pg_secret

import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to PostgreSQL

from snowflake.snowpark import Session

def create_pg_connection():
    import psycopg2
    connection = psycopg2.connect(
        host="pg_host",
        port=pg_port,
        dbname="pg_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    return df

$$;
CALL sp_pg_dbapi();
Copy

Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu PostgreSQL herzustellen

  • Wählen Sie snowflake-snowpark-python und psycopg2 aus Snowflake Notebook-Pakete aus.

  • Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

-- Configure the secret

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';
  ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);

 -- Configure the network rule to allow egress to the source endpoint

 CREATE OR REPLACE NETWORK RULE pg_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pg_host:pg_port');

 -- Configure external access integration

 CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
    ALLOWED_NETWORK_RULES = (pg_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
    ENABLED = true;
Copy
  • Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
        )
        return connection
    
    # Use dbapi to read and save data from pg_table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

MySQL

Um von Snowpark aus eine Verbindung mit MySQL herzustellen, benötigen Sie die folgenden beiden Pakete:

Nachfolgend finden Sie die Codebeispiele, die für die Verbindung mit MySQL vom Snowpark-Client aus benötigt werden, sowie gespeicherte Prozeduren und Snowflake Notebook.

Verwenden von DB-API, um eine Verbindung zu MySQL von einem Snowpark-Client aus herzustellen

  • Installieren Sie pymysql

pip install snowflake-snowpark-python[pandas]
pip install pymysql
Copy
  • Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        database="mysql_db",
        user="mysql_user",
        password="mysql_password"
    )
    return connection


# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table"
)
Copy

Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu MySQL herzustellen

  • Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

-- Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • Verwenden Sie Snowpark Python DB-API, um Daten aus MySQL in einer gespeicherten Python-Prozedur abzurufen,

CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$

# Get user name and password from mysql_secret

import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password

# Define the factory method for creating a connection to MySQL

from snowflake.snowpark import session

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_mysql_connection,
        table="mysql_table"
    )
    return df
$$;

CALL sp_mysql_dbapi();
Copy

Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu MySQL herzustellen

  • Wählen Sie snowflake-snowpark-python und pymysql aus den Snowflake Notebook-Paketen aus.

  • Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis und fügen Sie es dem Snowflake-Notizbuch hinzu.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

  ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Copy
  • Konfigurieren Sie eine Netzwerkregel und eine Integration für den externen Zugriff.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table")

# Save data into sf_table

df.write.mode("overwrite").save_as_table('sf_table')
Copy

Databricks

Um eine Verbindung zu Databricks von Snowpark aus herzustellen, benötigen Sie die folgenden beiden Pakete:

Im Folgenden finden Sie die Codebeispiele, die benötigt werden, um vom Snowpark-Client, gespeicherten Prozeduren und Snowflake Notebook aus eine Verbindung zu Databricks herzustellen.

Verwenden von DB-API, um von einem Snowpark-Client aus eine Verbindung zu Databricks herzustellen

  • Installieren Sie databricks-sql-connector:

pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Copy
  • Definieren Sie die Factory-Methode zum Herstellen einer Verbindung zu Databricks

def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname=HOST,
        http_path=PATH,
        access_token=ACCESS_TOKEN
    )
    return connection


#Call dbapi to pull data from mytable

df = session.read.dbapi(
    create_dbx_connection,
    table="dbx_table")
Copy

Verwenden von DB-API, um von einer gespeicherten Prozedur aus eine Verbindung zu Databricks herzustellen

  • Wählen Sie snowflake-snowpark-python und pymysql aus Snowflake Notebook-Pakete aus.

  • Die Integration für den externen Zugriff ist erforderlich, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis, eine Netzwerkregel für ausgehenden Datenverkehr zum Quellendpunkt und die Integration für den externen Zugriff.

CREATE OR REPLACE SECRET dbx_secret
  TYPE = GENERIC_STRING
  SECRET_STRING = 'dbx_access_token';

-- Configure a network rule

CREATE OR REPLACE NETWORK RULE dbx_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('dbx_host:dbx_port');

--  Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
  ALLOWED_NETWORK_RULES = (dbx_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
  ENABLED = true;
Copy
  • Verwenden Sie Snowpark Python DB-API, um Daten aus Databricks in einer gespeicherten Python-Prozedur abzurufen

CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
  EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
  SECRETS = ('cred' = dbx_secret )
AS $$

# Get user name and password from dbx_secret

import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')

from snowflake.snowpark import Session

# define the method for creating a connection to Databricks
def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname="dbx_host",
        http_path="dbx_path",
        access_token=ACCESS_TOKEN,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    return df

$$;

CALL sp_dbx_dbapi();
Copy

Verwenden von DB-API, um von einem Snowflake-Notizbuch aus eine Verbindung zu Databricks herzustellen

  • Wählen Sie snowflake-snowpark-python und pymysql aus Snowflake Notebook-Pakete aus.

  • Konfigurieren Sie die Integration für den externen Zugriff, die erforderlich ist, damit Snowflake eine Verbindung mit dem Quellendpunkt herstellen kann.

    Bemerkung

    PrivateLink wird für eine sichere Datenübertragung empfohlen, insbesondere im Umgang mit sensiblen Informationen. Stellen Sie sicher, dass für Ihr Snowflake-Konto die erforderlichen PrivateLink-Berechtigungen aktiviert sind und dass das PrivateLink-Feature in Ihrer Snowflake Notebook-Umgebung konfiguriert und aktiv ist.

  • Konfigurieren Sie das Geheimnis und fügen Sie es dem Snowflake-Notizbuch hinzu.

    CREATE OR REPLACE SECRET dbx_secret
      TYPE = GENERIC_STRING
      SECRET_STRING = 'dbx_access_token';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    Copy
  • Konfigurieren

    CREATE OR REPLACE NETWORK RULE dbx_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
      ALLOWED_NETWORK_RULES = (dbx_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
      ENABLED = true;
    
    Copy
  • Einrichten des externen Zugriffs für Snowflake Notebooks und starten Sie dann die Notebook-Sitzung neu.

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # use dbapi to read data from dbx_table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    
    # save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Einschränkungen

Treiber

Snowpark Python DB-API unterstützt nur Python DB-API-2.0-kompatible Treiber (z. B. pyodbc, oracledb). JDBC-Treiber werden in diesem Release nicht unterstützt.