Einrichtung von JDBC-Datenquellen für Snowpark Connect for Spark

Dieser Abschnitt enthält eine Anleitung und Beispielcode zum Lesen von Daten aus externen Datenbanken und zum Schreiben von Daten in externe Datenbanken (z. B.MySQL undPostgreSQL) über das Snowpark Connect-Feature für JDBC-Datenquellen. Er deckt sowohl die clientseitige Einrichtung als auch die Einrichtung von Snowflake Notebook ab.

Teil 1: Clientseitige Einrichtung (MySQL )

Diese Einrichtung ist erforderlich, wenn Sie Snowpark Connect von einer lokalen Clientanwendung aus ausführen, z. B. einem Python-Skript oder einer IDE.

Voraussetzungen

  1. Java Runtime Environment (JRE) / Java Development Kit (JDK):

    • Installieren Sie eine JRE oder ein JDK. Die Architektur (z. B. 64-Bit) Ihrer Java-Installation muss mit der Architektur Ihrer Python-Installation übereinstimmen.

    • Beispielquelle für die Installation: Adoptium Temurin Releases (bei Verwendung von Java 11).

  2. Festlegen der ``JAVA_HOME``-Umgebungsvariablen:

    • Konfigurieren Sie die JAVA_HOME-Umgebungsvariable, die auf das Stammverzeichnis Ihrer Java-Installation verweist.

    • Beispiel (macOS/Linux):

export JAVA_HOME=/path/to/your/jdk/home
  1. Festlegen der ``CLASSPATH``-Umgebungsvariablen:

    • Fügen Sie den Pfad zu der spezifischen .jar-Datei des JDBC-Treibers Ihrer Datenbank in die CLASSPATH-Umgebungsvariable ein. Dadurch kann die Java-Umgebung den erforderlichen Treiber finden.

    • Beispiel (für den MySQL-Treiber):

export CLASSPATH=$CLASSPATH:/path/to/your/driver/mysql-connector-j-9.2.0.jar

Beispiel für Clientcode (lesen aus MySQL)

Dieses Beispiel zeigt, wie eine Tabelle mit spark_session.read.jdbc() aus einer MySQL-Datenbank gelesen wird.

from pyspark.sql import Row

# Adjust the URL for your server host, port, and database name
MYSQL_JDBC_URL = "jdbc:mysql://localhost/test_db"

# Ensure this driver name matches your version of the JDBC driver
MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"

def test_jdbc_read_from_mysql(self, spark_session):
    # This code snippet uses the Snowpark Connect Spark session
    jdbc_df = spark_session.read.jdbc(
        MYSQL_JDBC_URL,
        "my_schema.my_table",  # Specify your table name in MySQL
        properties={
            "user": "root",           # Your MySQL user name
            "password": "****",       # Your password for MySQL
            "driver": MYSQL_JDBC_DRIVER,
        },
    ).collect()

    # After reading via JDBC, the data is loaded into a temporary table in Snowflake.
    # You can now perform any standard DataFrame operations supported by Snowpark Connect.

Beispiel für Clientcode (schreiben inMySQL)

Dieses Beispiel zeigt, wie Daten mit spark_session.write.jdbc() in eine MySQL-Datenbank geschrieben werden.

from pyspark.sql import Row

# Adjust the URL for your server host, port, and database name
MYSQL_JDBC_URL = "jdbc:mysql://localhost/test_db"

# Ensure this driver name matches your version of the JDBC driver
MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"

def test_jdbc_write_overwrite_to_mysql(self, spark_session):
    # This code snippet uses the Snowpark Connect Spark session
    jdbc_df = spark_session.createDataFrame(
        [
            Row(a=1, b=2.0, c="test1"),
            Row(a=2, b=3.0, c="test2"),
            Row(a=4, b=5.0, c="test3"),
        ]
    )

    jdbc_df.write.jdbc(
        MYSQL_JDBC_URL,
        "my_schema.my_table2",  # Specify your table name in MySQL
        mode="overwrite",
        properties={
            "user": "root",        # Your MySQL user name
            "password": "****",    # Your password for MySQL
            "driver": MYSQL_JDBC_DRIVER,
        },
    )

Teil 2: Einrichtung von Snowflake Warehouse Notebook (PostgreSQL)

Diese Einrichtung wird verwendet, wenn Sie Snowpark Connect direkt innerhalb einer Snowflake Notebook-Umgebung ausführen.

Einrichtungsschritte

  • Hinzufügen des ``snowpark-connect``-Pakets:

    • Stellen Sie sicher, dass das snowflake-snowpark-connect-Paket zu Ihrer Notebook-Umgebung hinzugefügt wird.

Hinzufügen des snowflake-snowpark-connect-Pakets zu Snowflake Notebook
  • Herunter- und Hochladen des JDBC-Treibers:

    • Laden Sie die entsprechende .jar-Datei des JDBC-Treibers für Ihre externe Datenbank (z. B. PostgreSQL JDBC-Treiber _) herunter.

    • Laden Sie die heruntergeladene .jar-Datei direkt in Ihre Notebook-Umgebung hoch.

  • Externe Integrationen aktivieren (Netzwerkregel und -Integration):

    • Snowflake benötigt eine Integration für den externen Zugriff, damit das Notebook mit externen Netzwerkstandorten kommunizieren kann. Sie müssen eine Netzwerkregel für den Host und den Port Ihrer externen Datenbank definieren.

Konfigurieren der Einstellungen für Netzwerkregeln in Snowflake Notebook
Hochladen der JAR-Datei des JDBC-Treibers in Snowflake Notebook
-- 1. Create a Network Rule for the external database host and port
CREATE OR REPLACE NETWORK RULE JDBC_READ_NETWORK_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('hh-pgsql-public.ebi.ac.uk:5432'); -- REPLACE with your host:port

-- 2. Create the External Access Integration using the new Network Rule
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION JDBC_READ_ACCESS_INTEGRATION
  ALLOWED_NETWORK_RULES = (JDBC_READ_NETWORK_RULE)
  ENABLED = true;

-- NOTE: This integration must be referenced/activated within your notebook's settings.

Beispielcode für ein Warehouse Notebook (lesen aus PostgreSQL)

Dieses Beispiel zeigt den notwendigen Python-Code, um die Sitzung zu initialisieren, den Treiber zu laden und Daten aus PostgreSQL zu lesen.

from snowflake import snowpark_connect
import jpype

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
jpype.addClassPath('/tmp/appRoot/postgresql-42.7.8.jar')

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df = spark.read.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "",  # Empty string for table name when providing a custom query
    properties={
        "user": "reader",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
        # Use the "query" property for a custom SQL statement
        "query": """SELECT
  upi,     -- RNAcentral URS identifier
  taxid,   -- NCBI taxid
  ac       -- external accession
FROM xref
WHERE ac IN ('OTTHUMT00000106564.1', 'OTTHUMT00000416802.1')"""
    },
)

jdbc_df.show()

Beispielcode für Warehouse Notebook (schreiben in PostgreSQL)

Dieses Beispiel zeigt den notwendigen Python-Code, um die Sitzung zu initialisieren, den Treiber zu laden und Daten in PostgreSQL zu schreiben.

from snowflake import snowpark_connect
from pyspark.sql import Row
import jpype

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
jpype.addClassPath('/tmp/appRoot/postgresql-42.7.8.jar')

# Create dataframe
jdbc_df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="test1"),
        Row(a=2, b=3.0, c="test2"),
        Row(a=4, b=5.0, c="test3"),
    ]
)

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df.write.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "public.my_table2",  # Specify your table name in PostgreSQL
    mode="overwrite",
    properties={
        "user": "writer",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
    },
)

Teil 3: Einrichten von Snowflake Workspace Notebook (PostgreSQL)

Diese Einrichtung wird verwendet, wenn Sie Snowpark Connect direkt innerhalb einer Snowflake Workspace Notebook-Umgebung ausführen.

Einrichtungsschritte

  • Das snowpark-connect-Paket ist standardmäßig in Workspace Notebook enthalten.

  • Herunter- und Hochladen des JDBC-Treibers:

    • Laden Sie die entsprechende .jar-Datei des JDBC-Treibers für Ihre externe Datenbank (z. B. PostgreSQL JDBC-Treiber _) herunter.

    • Laden Sie die heruntergeladene .jar-Datei direkt in Ihre Notebook-Umgebung hoch.

Hochladen des JDBC-Treibers in Snowflake Workspace Notebook
  • Erstellen der externen Integration:

-- 1. Create a Network Rule for the external database host and port
CREATE OR REPLACE NETWORK RULE JDBC_READ_NETWORK_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('hh-pgsql-public.ebi.ac.uk:5432'); -- REPLACE with your host:port

-- 2. Create the External Access Integration using the new Network Rule
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION JDBC_READ_ACCESS_INTEGRATION
  ALLOWED_NETWORK_RULES = (JDBC_READ_NETWORK_RULE)
  ENABLED = true;

-- NOTE: This integration must be referenced/activated within your notebook's settings.
  • Externe Integrationen aktivieren (Netzwerkregel und -Integration):

    • Snowflake benötigt eine Integration für den externen Zugriff, damit das Notebook mit externen Netzwerkstandorten kommunizieren kann. Sie müssen eine Netzwerkregel für den Host und den Port Ihrer externen Datenbank definieren.

    Aktivieren der Integration für den externen Zugriff in den Einstellungen von Workspace Notebook

Beispielcode für Workspace Notebook (lesen aus PostgreSQL)

Dieses Beispiel zeigt den notwendigen Python-Code, um die Sitzung zu initialisieren, den Treiber zu laden und Daten aus PostgreSQL zu lesen.

from snowflake import snowpark_connect
import jpype
import os

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
# Copy the driver to /tmp directory
os.system("cp ./postgresql-42.7.8.jar /tmp/postgresql-42.7.8.jar")
jpype.addClassPath('/tmp/postgresql-42.7.8.jar')

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df = spark.read.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "",  # Empty string for table name when providing a custom query
    properties={
        "user": "reader",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
        # Use the "query" property for a custom SQL statement
        "query": """SELECT
  upi,     -- RNAcentral URS identifier
  taxid,   -- NCBI taxid
  ac       -- external accession
FROM xref
WHERE ac IN ('OTTHUMT00000106564.1', 'OTTHUMT00000416802.1')"""
    },
)

jdbc_df.show()

Beispielcode für Workspace Notebook (schreiben in PostgreSQL)

Dieses Beispiel zeigt den notwendigen Python-Code, um die Sitzung zu initialisieren, den Treiber zu laden und Daten in PostgreSQL zu schreiben.

from snowflake import snowpark_connect
from pyspark.sql import Row
import jpype
import os

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
# Copy the driver to /tmp directory
os.system("cp ./postgresql-42.7.8.jar /tmp/postgresql-42.7.8.jar")
jpype.addClassPath('/tmp/postgresql-42.7.8.jar')

# Create dataframe
jdbc_df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="test1"),
        Row(a=2, b=3.0, c="test2"),
        Row(a=4, b=5.0, c="test3"),
    ]
)

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df.write.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "public.my_table2",  # Specify your table name in PostgreSQL
    mode="overwrite",
    properties={
        "user": "writer",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
    },
)

Unterstützte Datenquellen

  • SQL Server

  • MySQL

  • PostgreSQL