Snowpark Python JDBC 사용

Snowpark Python JDBC를 통해 Snowpark Python 사용자는 프로그래밍 방식으로 외부 데이터베이스에서 Snowflake로 데이터를 가져올 수 있습니다. 이를 통해 JDBC 드라이버를 사용하여 외부 데이터베이스에 연결할 수 있습니다.

이러한 APIs를 사용하면 데이터를 Snowflake 테이블로 원활하게 가져오고, :doc:`Snowpark DataFrames</developer-guide/snowpark/python/working-with-dataframes>`를 이용해 고급 분석용으로 변환할 수 있습니다.

`JDBC<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc>`_는 Spark JDBC API와 유사한 방식으로 사용할 수 있습니다. 대부분의 매개 변수는 더 나은 패리티를 얻기 위해 동일하거나 유사하게 설계되었습니다. Snowpark Python JDBC와 Spark JDBC API의 비교에 대한 자세한 내용은 다음 테이블을 참조하세요.

Snowpark JDBC 매개 변수

매개 변수

Snowpark Python JDBC

url

JDBC 드라이버를 통해 외부 데이터 소스에 연결하는 데 사용되는 연결 문자열

udtf_configs

UDTF 생성에 필요한 구성이 포함된 사전

properties

JDBC 연결을 설정하는 동안 필요한 키-값 페어가 포함된 사전

table

소스 데이터베이스의 테이블

query

데이터를 읽기 위해 하위 쿼리로 래핑된 SQL 쿼리

column

병렬로 읽기 위한 분할 열

lower_bound

분할의 하한

upper_bound

분할의 상한

num_partitions

병렬 처리를 위한 파티션의 수

query_timeout

SQL 실행에 대한 제한 시간(초 단위)

fetch_size

왕복당 가져오는 행의 수

custom_schema

외부 데이터베이스에서 데이터를 가져오기 위한 사용자 지정 스키마

predicates

WHERE 절 파티션의 조건 목록

session_init_statement

세션 초기화 시 SQL 또는 PL/SQL 문 실행

병렬 처리 이해하기

Snowpark Python JDBC에는 현재 한 가지 형식의 기본 수집 메커니즘이 있습니다.

UDTF 수집

모든 워크로드는 Snowflake 서버에서 실행됩니다. Snowpark는 Java UDTF를 생성하고 병렬로 호출하여 데이터를 Snowflake 임시 테이블로 수집합니다. 따라서 이 기능에는 udtf_configs 매개 변수가 필요합니다.

Snowpark Python JDBC에는 수집을 병렬화하고 가속화하는 두 가지 방법이 있습니다.

파티션 열

이 방법은 사용자가 :code:`jdbc()`를 호출할 때 4개의 매개 변수를 기준으로 소스 데이터를 여러 파티션으로 나눕니다.

  • column

  • lower_bound

  • upper_bound

  • num_partitions

이 4가지 매개 변수는 동시에 설정해야 하며 :code:`column`은 숫자 또는 날짜 유형이어야 합니다.

조건자

이 메서드는 WHERE 절에 포함하기에 적합한 표현식 목록인 매개 변수 조건자를 기준으로 소스 데이터를 파티션으로 나눕니다. 여기서 각 표현식은 파티션을 정의합니다. 조건자는 보다 유연한 파티션 분할 방법을 제공합니다. 예를 들어, 부울 또는 숫자가 아닌 열에서 파티션을 나눌 수 있습니다.

Snowpark Python JDBC를 사용하여 파티션 내에서 병렬 처리 수준을 조정할 수도 있습니다.

Fetch_size

API가 파티션 내에서 :code:`fetch_size`로 정의된 청크의 행을 가져옵니다. 이러한 행은 가져올 때 병렬로 Snowflake에 작성되므로 읽기와 쓰기가 겹치고 처리량이 극대화될 수 있습니다.

JDBC를 사용하여 외부 데이터 소스에서 데이터 수집하기

JDBC를 사용하여 Snowpark 클라이언트에서 데이터 수집하기

  1. Snowpark 또는 Snowsight를 사용하여 JDBC 드라이버 jar 파일을 Snowflake 스테이지로 업로드

    • Snowpark를 사용하여 업로드합니다.

      Snowpark에서 세션을 생성한 후 다음 코드를 실행합니다.

      session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
      
      Copy
    • 다음 단계에 설명된 대로 Snowsight를 사용하여 업로드합니다.

      1. Snowsight에서 Catalog -> :ui:`Database Explorer`를 클릭합니다.

      2. 데이터베이스의 왼쪽 검색 창에서 [your database name] -> [your schema name] -> stages -> :ui:`[your stage name]`을 클릭합니다.

      3. 스테이지 페이지의 오른쪽 상단에 있는 “+File” 버튼을 클릭합니다.

  2. 시크릿, 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Snowpark 클라이언트에서 Snowpark JDBC를 사용하여 대상에서 데이터를 가져옵니다.

    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

JDBC를 사용하여 저장 프로시저에서 데이터 수집하기

  1. Snowsight를 사용하여 JDBC 드라이버 jar 파일을 Snowflake 스테이지로 업로드

    • Snowsight에서 Catalog -> :ui:`Database Explorer`를 클릭합니다.

    • 데이터베이스의 왼쪽 검색 창에서 [your database name] -> [your schema name] -> stages -> :ui:`[your stage name]`을 클릭합니다.

    • 스테이지 페이지의 오른쪽 상단에 있는 “+File” 버튼을 클릭합니다.

  2. 시크릿, 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. 저장 프로시저에서 Snowpark JDBC를 사용하여 대상에서 데이터를 가져옵니다.

    CREATE OR REPLACE PROCEDURE sp_jdbc()
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python')
    HANDLER = 'run'
    AS
    $$
    import time
    def run(session):
        connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
        udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
        # Call jdbc to pull data from target table
        df_table = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
            )
    
        # Call jdbc to pull data from target query
        df_query = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                query="select * from <your table>",
            )
    
        # Pull data from target table with parallelism using partition column
        df_table_partition_column = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                num_partitions=4,
                column="ID",
                upper_bound=10000,
                lower_bound=0
            )
    
        # Pull data from target table with parallelism using predicates
        df_table_predicates = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                predicates = [
                    "ID < 3",
                    "ID >= 3"
                ]
            )
        df_table.write.save_as_table("snowflake_table", mode="overwrite")
        return f"success"
    
    $$
    ;
    
    call sp_jdbc();
    select * from snowflake_table ;
    
    Copy

JDBC를 사용하여 Snowflake 노트북에서 데이터 수집하기

  1. Snowsight를 사용하여 JDBC 드라이버 jar 파일을 Snowflake 스테이지로 업로드

    • Snowsight에서 Catalog -> :ui:`Database Explorer`를 클릭합니다.

    • 데이터베이스의 왼쪽 검색 창에서 [your database name] -> [your schema name] -> stages -> :ui:`[your stage name]`을 클릭합니다.

    • 스테이지 페이지의 오른쪽 상단에 있는 “+File” 버튼을 클릭합니다.

  2. 시크릿, 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Snowflake 노트북에서 Snowpark JDBC를 사용하여 대상에서 데이터를 가져옵니다.

    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

소스 추적

Snowpark JDBC를 사용하여 MySQL에 연결할 때의 소스 추적

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
      SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%';
    )
    
    Copy

Snowpark JDBC를 사용하여 SQL Server에 연결할 때의 소스 추적

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
          url=connection_str,
          udtf_configs=udtf_configs,
          table="<your table>",
      )
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT
      s.session_id,
      s.program_name,
      r.status,
      t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Snowpark JDBC를 사용하여 PostgresSQL에 연결할 때의 소스 추적

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT
      pid,
      usename AS username,
      datname AS database,
      application_name,
      client_addr,
      state,
      query_start,
      query
    FROM
      pg_stat_activity
    WHERE
      application_name = 'snowflake-snowpark-python';
    
    Copy

Snowpark JDBC를 사용하여 Oracle에 연결할 때의 소스 추적

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT
      sid,
      serial#,
      username,
      program,
      module,
      action,
      client_identifier,
      client_info,
      osuser,
      machine
    FROM v$session
    WHERE program = 'snowflake-snowpark-python';
    
    Copy

공통 DBMS 및 타입 지원

다음은 다양한 DBMS 시스템의 인증된 데이터 타입 목록입니다. 소스 데이터에 다른 데이터 타입이 포함된 경우 Snowpark Python JDBC는 이를 최선의 방식으로 Snowflake 데이터 타입에 매핑하거나 문자열로 대체하려고 시도합니다.

Oracle

  • INTEGER

  • NUMBER

  • BINARY_FLOAT

  • BINARY_DOUBLE

  • VARCHAR2

  • CHAR

  • CLOB

  • NCHAR

  • NVARCHAR2

  • NCLOB

  • DATE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • RAW

PostgresSQL

  • BIGINT

  • BIGSERIAL

  • BIT

  • BIT VARYING

  • BOOLEAN

  • BOX

  • BYTEA

  • CHAR

  • VARCHAR

  • CIDR

  • CIRCLE

  • DATE

  • DOUBLE PRECISION

  • INET

  • INTEGER

  • INTERVAL

  • JSON

  • JSONB

  • LINE

  • LSEG

  • MACADDR

  • POINT

  • POLYGON

  • REAL

  • SMALLINT

  • SMALLSERIAL

  • SERIAL

  • TEXT

  • TIME

  • TIMESTAMP

  • TIMESTAMPTZ

  • TSQUERY

  • TSVECTOR

  • TXID_SNAPSHOT

  • UUID

  • XML

MySQL

  • INT

  • DECIMAL

  • INT

  • TINYINT

  • SMALLINT

  • MEDIUMINT

  • BIGINT

  • YEAR

  • FLOAT

  • DOUBLE

  • CHAR

  • VARCHAR

  • TINYTEXT

  • TEXT

  • MEDIUMTEXT

  • LONGTEXT

  • ENUM

  • SET

  • BIT

  • BINARY

  • VARBINARY

  • TINYBLOB

  • BLOB

  • MEDIUMBLOB

  • LONGBLOB

  • DATE

  • DATETIME

  • TIMESTAMP

  • TIME

  • JSON

SQL 서버

  • INT

  • BIGINT

  • INT

  • SMALLINT

  • TINYINT

  • BIT

  • DECIMAL

  • NUMERIC

  • MONEY

  • SMALLMONEY

  • FLOAT

  • REAL

  • DATE

  • TIME

  • DATETIME

  • DATETIME2

  • SMALLDATETIME

  • CHAR

  • VARCHAR

  • VARCHAR(MAX)

  • TEXT

  • NCHAR

  • NVARCHAR

  • NVARCHAR(MAX)

  • NTEXT

  • BINARY

  • VARBINARY

  • VARBINARY(MAX)

  • IMAGE

  • UNIQUEIDENTIFIER

  • TIMESTAMP

Databricks

Snowpark Python JDBC를 사용하여 Databricks에 연결하는 것은 현재 지원되지 않습니다.