Snowpark Python JDBC 사용¶
Snowpark Python JDBC를 통해 Snowpark Python 사용자는 프로그래밍 방식으로 외부 데이터베이스에서 Snowflake로 데이터를 가져올 수 있습니다. 이를 통해 JDBC 드라이버를 사용하여 외부 데이터베이스에 연결할 수 있습니다.
이러한 APIs를 사용하면 데이터를 Snowflake 테이블로 원활하게 가져오고, :doc:`Snowpark DataFrames</developer-guide/snowpark/python/working-with-dataframes>`를 이용해 고급 분석용으로 변환할 수 있습니다.
`JDBC<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc>`_는 Spark JDBC API와 유사한 방식으로 사용할 수 있습니다. 대부분의 매개 변수는 더 나은 패리티를 얻기 위해 동일하거나 유사하게 설계되었습니다. Snowpark Python JDBC와 Spark JDBC API의 비교에 대한 자세한 내용은 다음 테이블을 참조하세요.
Snowpark JDBC 매개 변수¶
매개 변수 |
Snowpark Python JDBC |
|---|---|
|
JDBC 드라이버를 통해 외부 데이터 소스에 연결하는 데 사용되는 연결 문자열 |
|
UDTF 생성에 필요한 구성이 포함된 사전 |
|
JDBC 연결을 설정하는 동안 필요한 키-값 페어가 포함된 사전 |
|
소스 데이터베이스의 테이블 |
|
데이터를 읽기 위해 하위 쿼리로 래핑된 SQL 쿼리 |
|
병렬로 읽기 위한 분할 열 |
|
분할의 하한 |
|
분할의 상한 |
|
병렬 처리를 위한 파티션의 수 |
|
SQL 실행에 대한 제한 시간(초 단위) |
|
왕복당 가져오는 행의 수 |
|
외부 데이터베이스에서 데이터를 가져오기 위한 사용자 지정 스키마 |
|
WHERE 절 파티션의 조건 목록 |
|
세션 초기화 시 SQL 또는 PL/SQL 문 실행 |
병렬 처리 이해하기¶
Snowpark Python JDBC에는 현재 한 가지 형식의 기본 수집 메커니즘이 있습니다.
- UDTF 수집
모든 워크로드는 Snowflake 서버에서 실행됩니다. Snowpark는 Java UDTF를 생성하고 병렬로 호출하여 데이터를 Snowflake 임시 테이블로 수집합니다. 따라서 이 기능에는
udtf_configs매개 변수가 필요합니다.
Snowpark Python JDBC에는 수집을 병렬화하고 가속화하는 두 가지 방법이 있습니다.
- 파티션 열
이 방법은 사용자가 :code:`jdbc()`를 호출할 때 4개의 매개 변수를 기준으로 소스 데이터를 여러 파티션으로 나눕니다.
columnlower_boundupper_boundnum_partitions
이 4가지 매개 변수는 동시에 설정해야 하며 :code:`column`은 숫자 또는 날짜 유형이어야 합니다.
- 조건자
이 메서드는
WHERE절에 포함하기에 적합한 표현식 목록인 매개 변수 조건자를 기준으로 소스 데이터를 파티션으로 나눕니다. 여기서 각 표현식은 파티션을 정의합니다. 조건자는 보다 유연한 파티션 분할 방법을 제공합니다. 예를 들어, 부울 또는 숫자가 아닌 열에서 파티션을 나눌 수 있습니다.
Snowpark Python JDBC를 사용하여 파티션 내에서 병렬 처리 수준을 조정할 수도 있습니다.
- Fetch_size
API가 파티션 내에서 :code:`fetch_size`로 정의된 청크의 행을 가져옵니다. 이러한 행은 가져올 때 병렬로 Snowflake에 작성되므로 읽기와 쓰기가 겹치고 처리량이 극대화될 수 있습니다.
JDBC를 사용하여 외부 데이터 소스에서 데이터 수집하기¶
JDBC를 사용하여 Snowpark 클라이언트에서 데이터 수집하기¶
Snowpark 또는 Snowsight를 사용하여 JDBC 드라이버 jar 파일을 Snowflake 스테이지로 업로드
Snowpark를 사용하여 업로드합니다.
Snowpark에서 세션을 생성한 후 다음 코드를 실행합니다.
session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
다음 단계에 설명된 대로 Snowsight를 사용하여 업로드합니다.
시크릿, 네트워크 규칙 및 외부 액세스 통합을 구성합니다.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Snowpark 클라이언트에서 Snowpark JDBC를 사용하여 대상에서 데이터를 가져옵니다.
connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
JDBC를 사용하여 저장 프로시저에서 데이터 수집하기¶
Snowsight를 사용하여 JDBC 드라이버 jar 파일을 Snowflake 스테이지로 업로드
시크릿, 네트워크 규칙 및 외부 액세스 통합을 구성합니다.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
저장 프로시저에서 Snowpark JDBC를 사용하여 대상에서 데이터를 가져옵니다.
CREATE OR REPLACE PROCEDURE sp_jdbc() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.10' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'run' AS $$ import time def run(session): connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] ) df_table.write.save_as_table("snowflake_table", mode="overwrite") return f"success" $$ ; call sp_jdbc(); select * from snowflake_table ;
JDBC를 사용하여 Snowflake 노트북에서 데이터 수집하기¶
Snowsight를 사용하여 JDBC 드라이버 jar 파일을 Snowflake 스테이지로 업로드
시크릿, 네트워크 규칙 및 외부 액세스 통합을 구성합니다.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Snowflake 노트북에서 Snowpark JDBC를 사용하여 대상에서 데이터를 가져옵니다.
import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
소스 추적¶
Snowpark JDBC를 사용하여 MySQL에 연결할 때의 소스 추적¶
create connection 함수에 Snowpark의 태그를 포함합니다.
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%'; )
Snowpark JDBC를 사용하여 SQL Server에 연결할 때의 소스 추적¶
create connection 함수에 Snowpark의 태그를 포함합니다.
connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Snowpark JDBC를 사용하여 PostgresSQL에 연결할 때의 소스 추적¶
create connection 함수에 Snowpark의 태그를 포함합니다.
connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
Snowpark JDBC를 사용하여 Oracle에 연결할 때의 소스 추적¶
create connection 함수에 Snowpark의 태그를 포함합니다.
connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT sid, serial#, username, program, module, action, client_identifier, client_info, osuser, machine FROM v$session WHERE program = 'snowflake-snowpark-python';
공통 DBMS 및 타입 지원¶
다음은 다양한 DBMS 시스템의 인증된 데이터 타입 목록입니다. 소스 데이터에 다른 데이터 타입이 포함된 경우 Snowpark Python JDBC는 이를 최선의 방식으로 Snowflake 데이터 타입에 매핑하거나 문자열로 대체하려고 시도합니다.
Oracle¶
INTEGER
NUMBER
BINARY_FLOAT
BINARY_DOUBLE
VARCHAR2
CHAR
CLOB
NCHAR
NVARCHAR2
NCLOB
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
RAW
PostgresSQL¶
BIGINT
BIGSERIAL
BIT
BIT VARYING
BOOLEAN
BOX
BYTEA
CHAR
VARCHAR
CIDR
CIRCLE
DATE
DOUBLE PRECISION
INET
INTEGER
INTERVAL
JSON
JSONB
LINE
LSEG
MACADDR
POINT
POLYGON
REAL
SMALLINT
SMALLSERIAL
SERIAL
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TSQUERY
TSVECTOR
TXID_SNAPSHOT
UUID
XML
MySQL¶
INT
DECIMAL
INT
TINYINT
SMALLINT
MEDIUMINT
BIGINT
YEAR
FLOAT
DOUBLE
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
DATE
DATETIME
TIMESTAMP
TIME
JSON
SQL 서버¶
INT
BIGINT
INT
SMALLINT
TINYINT
BIT
DECIMAL
NUMERIC
MONEY
SMALLMONEY
FLOAT
REAL
DATE
TIME
DATETIME
DATETIME2
SMALLDATETIME
CHAR
VARCHAR
VARCHAR(MAX)
TEXT
NCHAR
NVARCHAR
NVARCHAR(MAX)
NTEXT
BINARY
VARBINARY
VARBINARY(MAX)
IMAGE
UNIQUEIDENTIFIER
TIMESTAMP
Databricks¶
Snowpark Python JDBC를 사용하여 Databricks에 연결하는 것은 현재 지원되지 않습니다.