Snowpark Connect for Spark 용 JDBC 데이터 소스 설정

이 섹션에서는 Snowpark Connect JDBC 데이터 소스 기능을 사용하여 외부 데이터베이스에서 데이터를 읽고 외부 데이터베이스(예: MySQL 및 PostgreSQL)에 데이터를 쓰는 방법에 대한 가이드 및 샘플 코드를 제공합니다. 클라이언트 측 설정과 Snowflake Notebook 설정을 모두 다룹니다.

파트 1: 클라이언트 측 설정(MySQL)

이 설정은 Python 스크립트 또는 IDE와 같은 로컬 클라이언트 애플리케이션에서 Snowpark Connect를 실행할 때 필요합니다.

전제 조건

  1. Java 런타임 환경(JRE)/Java 개발 키트(JDK):

    • JRE 또는 JDK를 설치합니다. Java 설치의 아키텍처(예: 64비트)는 Python 설치의 아키텍처와 반드시 일치해야 합니다.

    • 설치의 소스 예제: `Adoptium Temurin 릴리스 <https://adoptium.net/temurin/releases/?version=11>`_(Java 11을 사용하는 경우).

  2. **JAVA_HOME 환경 변수 설정:

    • Java 설치의 루트 디렉터리를 지정하도록 JAVA_HOME 환경 변수를 구성합니다.

    • 예제(macOS/Linux):

export JAVA_HOME=/path/to/your/jdk/home
  1. **CLASSPATH 환경 변수 설정:

    • 특정 데이터베이스의 JDBC 드라이버 .jar 파일에 대한 경로를 CLASSPATH 환경 변수에 추가합니다. 이를 통해 Java 환경에서 필요한 드라이버를 찾을 수 있습니다.

    • 예제(MySQL 드라이버의 경우):

export CLASSPATH=$CLASSPATH:/path/to/your/driver/mysql-connector-j-9.2.0.jar

샘플 클라이언트 코드(MySQL에서 읽기)

이 예제에서는 ``spark_session.read.jdbc()``를 사용하여 MySQL 데이터베이스에서 테이블을 읽는 방법을 보여줍니다.

from pyspark.sql import Row

# Adjust the URL for your server host, port, and database name
MYSQL_JDBC_URL = "jdbc:mysql://localhost/test_db"

# Ensure this driver name matches your version of the JDBC driver
MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"

def test_jdbc_read_from_mysql(self, spark_session):
    # This code snippet uses the Snowpark Connect Spark session
    jdbc_df = spark_session.read.jdbc(
        MYSQL_JDBC_URL,
        "my_schema.my_table",  # Specify your table name in MySQL
        properties={
            "user": "root",           # Your MySQL user name
            "password": "****",       # Your password for MySQL
            "driver": MYSQL_JDBC_DRIVER,
        },
    ).collect()

    # After reading via JDBC, the data is loaded into a temporary table in Snowflake.
    # You can now perform any standard DataFrame operations supported by Snowpark Connect.

샘플 클라이언트 코드(MySQL에 쓰기)

이 예제에서는 ``spark_session.write.jdbc()``를 사용하여 MySQL 데이터베이스에 데이터를 쓰는 방법을 보여줍니다.

from pyspark.sql import Row

# Adjust the URL for your server host, port, and database name
MYSQL_JDBC_URL = "jdbc:mysql://localhost/test_db"

# Ensure this driver name matches your version of the JDBC driver
MYSQL_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"

def test_jdbc_write_overwrite_to_mysql(self, spark_session):
    # This code snippet uses the Snowpark Connect Spark session
    jdbc_df = spark_session.createDataFrame(
        [
            Row(a=1, b=2.0, c="test1"),
            Row(a=2, b=3.0, c="test2"),
            Row(a=4, b=5.0, c="test3"),
        ]
    )

    jdbc_df.write.jdbc(
        MYSQL_JDBC_URL,
        "my_schema.my_table2",  # Specify your table name in MySQL
        mode="overwrite",
        properties={
            "user": "root",        # Your MySQL user name
            "password": "****",    # Your password for MySQL
            "driver": MYSQL_JDBC_DRIVER,
        },
    )

파트 2: Snowflake Warehouse Notebook 설정(PostgreSQL)

이 설정은 Snowflake Notebook 환경 내에서 Snowpark Connect를 직접 실행할 때 사용됩니다.

설정 단계

  • ``snowpark-connect`` 패키지 추가:

    • snowflake-snowpark-connect 패키지가 노트북 환경에 추가되어 있는지 확인합니다.

Snowflake Notebook에 snowflake-snowpark-connect 패키지 추가하기
  • JDBC 드라이버 다운로드 및 업로드:

    • 외부 데이터베이스에 적절한 JDBC 드라이버 .jar 파일을 다운로드합니다(예: PostgreSQL JDBC 드라이버).

    • 다운로드한 .jar 파일을 노트북 환경에 직접 업로드합니다.

  • 외부 통합 활성화(네트워크 규칙 및 통합):

    • Snowflake에는 노트북이 외부 네트워크 위치와 통신할 수 있도록 **외부 액세스 통합**이 필요합니다. 외부 데이터베이스의 호스트 및 포트에 대한 **네트워크 규칙**을 정의해야 합니다.

Snowflake Notebook에서 네트워크 규칙 설정 구성하기
Snowflake Notebook에서 JDBC 드라이버 JAR 파일 업로드하기
-- 1. Create a Network Rule for the external database host and port
CREATE OR REPLACE NETWORK RULE JDBC_READ_NETWORK_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('hh-pgsql-public.ebi.ac.uk:5432'); -- REPLACE with your host:port

-- 2. Create the External Access Integration using the new Network Rule
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION JDBC_READ_ACCESS_INTEGRATION
  ALLOWED_NETWORK_RULES = (JDBC_READ_NETWORK_RULE)
  ENABLED = true;

-- NOTE: This integration must be referenced/activated within your notebook's settings.

샘플 웨어하우스 노트북 코드(PostgreSQL에서 읽기)

이 예제에서는 세션을 초기화하고, 드라이버를 로드하고, PostgreSQL에서 데이터를 읽는 데 필요한 Python 코드를 보여줍니다.

from snowflake import snowpark_connect
import jpype

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
jpype.addClassPath('/tmp/appRoot/postgresql-42.7.8.jar')

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df = spark.read.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "",  # Empty string for table name when providing a custom query
    properties={
        "user": "reader",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
        # Use the "query" property for a custom SQL statement
        "query": """SELECT
  upi,     -- RNAcentral URS identifier
  taxid,   -- NCBI taxid
  ac       -- external accession
FROM xref
WHERE ac IN ('OTTHUMT00000106564.1', 'OTTHUMT00000416802.1')"""
    },
)

jdbc_df.show()

샘플 웨어하우스 노트북 코드(PostgreSQL에 쓰기)

이 예제에서는 세션을 초기화하고, 드라이버를 로드하고, PostgreSQL에 데이터를 쓰는 데 필요한 Python 코드를 보여줍니다.

from snowflake import snowpark_connect
from pyspark.sql import Row
import jpype

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
jpype.addClassPath('/tmp/appRoot/postgresql-42.7.8.jar')

# Create dataframe
jdbc_df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="test1"),
        Row(a=2, b=3.0, c="test2"),
        Row(a=4, b=5.0, c="test3"),
    ]
)

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df.write.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "public.my_table2",  # Specify your table name in PostgreSQL
    mode="overwrite",
    properties={
        "user": "writer",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
    },
)

파트 3: Snowflake 작업 공간 노트북 설정(PostgreSQL)

이 설정은 Snowflake 작업 공간 노트북 환경 내에서 Snowpark Connect를 직접 실행할 때 사용됩니다.

설정 단계

  • snowpark-connect 패키지는 기본적으로 작업 공간 노트북에 포함됩니다.

  • JDBC 드라이버 다운로드 및 업로드:

    • 외부 데이터베이스에 적절한 JDBC 드라이버 .jar 파일을 다운로드합니다(예: PostgreSQL JDBC 드라이버).

    • 다운로드한 .jar 파일을 노트북 환경에 직접 업로드합니다.

Snowflake 작업 공간 노트북에서 JDBC 드라이버 업로드하기
  • 외부 통합 생성:

-- 1. Create a Network Rule for the external database host and port
CREATE OR REPLACE NETWORK RULE JDBC_READ_NETWORK_RULE
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('hh-pgsql-public.ebi.ac.uk:5432'); -- REPLACE with your host:port

-- 2. Create the External Access Integration using the new Network Rule
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION JDBC_READ_ACCESS_INTEGRATION
  ALLOWED_NETWORK_RULES = (JDBC_READ_NETWORK_RULE)
  ENABLED = true;

-- NOTE: This integration must be referenced/activated within your notebook's settings.
  • 외부 통합 활성화(네트워크 규칙 및 통합):

    • Snowflake에는 노트북이 외부 네트워크 위치와 통신할 수 있도록 **외부 액세스 통합**이 필요합니다. 외부 데이터베이스의 호스트 및 포트에 대한 **네트워크 규칙**을 정의해야 합니다.

    작업 공간 노트북 설정에서 외부 액세스 통합 활성화하기

샘플 작업 공간 노트북 코드(PostgreSQL에서 읽기)

이 예제에서는 세션을 초기화하고, 드라이버를 로드하고, PostgreSQL에서 데이터를 읽는 데 필요한 Python 코드를 보여줍니다.

from snowflake import snowpark_connect
import jpype
import os

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
# Copy the driver to /tmp directory
os.system("cp ./postgresql-42.7.8.jar /tmp/postgresql-42.7.8.jar")
jpype.addClassPath('/tmp/postgresql-42.7.8.jar')

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df = spark.read.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "",  # Empty string for table name when providing a custom query
    properties={
        "user": "reader",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
        # Use the "query" property for a custom SQL statement
        "query": """SELECT
  upi,     -- RNAcentral URS identifier
  taxid,   -- NCBI taxid
  ac       -- external accession
FROM xref
WHERE ac IN ('OTTHUMT00000106564.1', 'OTTHUMT00000416802.1')"""
    },
)

jdbc_df.show()

샘플 작업 공간 노트북 코드(PostgreSQL에서 쓰기)

이 예제에서는 세션을 초기화하고, 드라이버를 로드하고, PostgreSQL에 데이터를 쓰는 데 필요한 Python 코드를 보여줍니다.

from snowflake import snowpark_connect
from pyspark.sql import Row
import jpype
import os

# Initialize the Spark session for Snowpark Connect
spark = snowpark_connect.server.init_spark_session()
df = spark.sql("show schemas").limit(2)
df.show()

# Add the uploaded JDBC driver JAR to the Java Classpath using jpype
# Adjust the path to match the name of the JAR file you uploaded
# Copy the driver to /tmp directory
os.system("cp ./postgresql-42.7.8.jar /tmp/postgresql-42.7.8.jar")
jpype.addClassPath('/tmp/postgresql-42.7.8.jar')

# Create dataframe
jdbc_df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="test1"),
        Row(a=2, b=3.0, c="test2"),
        Row(a=4, b=5.0, c="test3"),
    ]
)

# Using public PostgreSQL DB as an example: https://rnacentral.org/help/public-database
jdbc_df.write.jdbc(
    # Adjust this URL as per your server host, port, and database
    "jdbc:postgresql://hh-pgsql-public.ebi.ac.uk:5432/pfmegrnargs",
    "public.my_table2",  # Specify your table name in PostgreSQL
    mode="overwrite",
    properties={
        "user": "writer",                # Your PostgreSQL user name
        "password": "***",               # Your password for PostgreSQL
        "driver": "org.postgresql.Driver",
    },
)

지원되는 데이터 소스

  • SQL 서버

  • MySQL

  • PostgreSQL