Snowpark Python DB-API 사용
With the Snowpark Python DB-API, Snowpark Python users can programmatically pull data from external databases into Snowflake. The DB-API includes:
이러한 APIs를 사용하면 데이터를 Snowflake 테이블로 원활하게 가져오고, :doc:`Snowpark DataFrames</developer-guide/snowpark/python/working-with-dataframes>`를 이용해 고급 분석용으로 변환할 수 있습니다.
`DB-API<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.dbapi>`_는 `Spark JDBCAPI<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_와 유사한 방식으로 사용할 수 있습니다. 대부분의 매개 변수는 더 나은 패리티를 얻기 위해 동일하거나 유사하게 설계되었습니다. 동시에, Snowpark는 직관적인 명명 규칙을 사용하여 JDBC 특정 구성을 방지하는 Python 우선 설계를 강조합니다. 이를 통해 Python 개발자에게 익숙한 환경을 제공합니다. Spark JDBC API와 Snowpark Python DB-API를 비교하는 내용은 다음 테이블을 참조하세요.
병렬 처리 이해하기
The Snowpark Python DB-API has two underlying forms of ingestion mechanisms:
- 로컬 수집
In local ingestion, Snowpark first fetches data from external sources to your local environment, where the dbapi() function is called and
converts them to Parquet files. Next, Snowpark uploads these Parquet files to a temporary Snowflake stage and copies them into a temporary
table from the stage.
- UDTF 수집
UDTF 수집에서 모든 워크로드는 Snowflake 서버에서 실행됩니다. Snowpark가 먼저 UDTF를 생성하고 실행하면, UDTF는 데이터를 Snowflake로 직접 수집하고 임시 테이블에 저장합니다.
The Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion:
- 파티션 열
This method divides source data into multiple partitions based on four parameters when users call dbapi():
column
lower_bound
upper_bound
num_partitions
이 네 가지 매개 변수는 동시에 설정해야 하며 :code:`column`은 숫자 또는 날짜 유형이어야 합니다.
- 조건자
This method divides source data into partitions based on parameter predicates, which are a list of expressions suitable for inclusion
in WHERE clauses, where each expression defines a partition. Predicates provide a more flexible way of dividing partitions; for example,
you can divide partitions on Boolean or non-numeric columns.
The Snowpark Python DB-API also allows the adjustment of parallelism level within a partition:
- Fetch_size
Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are
fetched, which allows reading and writing to overlap and maximizes throughput.
By combining the listed methods of ingestion and parallelism, Snowflake has four ways of ingestion:
파티션 열을 사용한 로컬 수집
df_local_par_column = session.read.dbapi(
create_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
조건자를 사용한 로컬 수집
df_local_predicates = session.read.dbapi(
create_connection,
table="target_table",
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
파티션 열을 사용한 UDTF 수집
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
df_udtf_par_column = session.read.dbapi(
create_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
조건자를 사용한 UDTF 수집
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
SQL Server
Snowpark에서 SQL 서버에 연결하려면 다음의 세 가지 패키지가 필요합니다.
다음 코드 예제는 Snowpark 클라이언트 및 저장 프로시저에서 SQL Server에 연결하는 방법을 보여줍니다.
DB-API를 사용하여 Snowpark 클라이언트에서 SQL Server에 연결
다음과 같이 Python SQL 드라이버를 설치합니다.
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
brew update
HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql mssql-tools
다음과 같이 snowflake-snowpark-python[pandas] 및 :code:`pyodbc`를 설치합니다.
pip install snowflake-snowpark-python[pandas]
pip install pyodbc
다음과 같이 SQL Server에 연결하기 위한 팩토리 메서드를 정의합니다.
def create_sql_server_connection():
import pyodbc
SERVER = "<your host name>"
PORT = <your port>
UID = "<your user name>"
PWD = "<your password>"
DATABASE = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={SERVER}:{PORT};"
f"UID={UID};"
f"PWD={PWD};"
f"DATABASE={DATABASE};"
"TrustServerCertificate=yes"
"Encrypt=yes"
# Optional to identify source of queries
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to SQL Server from a stored procedure
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
-- Configure a secret to allow egress to the source endpoint
CREATE OR REPLACE SECRET mssql_secret
TYPE = PASSWORD
USERNAME = 'mssql_username'
PASSWORD = 'mssql_password';
-- Configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE mssql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mssql_host:mssql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
ALLOWED_NETWORK_RULES = (mssql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
ENABLED = true;
다음과 같이 DB-API를 사용하여 Python 저장 프로시저에 SQL 서버의 데이터를 끌어옵니다.
CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
SECRETS = ('cred' = mssql_secret )
AS $$
# Get user name and password from mssql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define a method to connect to SQL Server_hostname
from snowflake.snowpark import Session
def create_sql_server_connection():
import pyodbc
host = "<your host>"
port = <your port>
username = USER
password = PASSWORD
database = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={host},{port};"
f"DATABASE={database};"
f"UID={username};"
f"PWD={password};"
"TrustServerCertificate=yes"
"Encrypt=yes"
# Optional to identify source of queries
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_mssql_dbapi();
Use the DB-API to connect to SQL Server from a Snowflake notebook
Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`pyodbc`를 선택합니다.
In the Files pane, open the file environment.yml, and under Dependencies, add the following line of code after other entries:
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
-- Configure a secret to allow egress to the source endpoint
CREATE OR REPLACE SECRET mssql_secret
TYPE = PASSWORD
USERNAME = 'mssql_username'
PASSWORD = 'mssql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret);
-- Configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE mssql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mssql_host:mssql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
ALLOWED_NETWORK_RULES = (mssql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
ENABLED = true;
:doc:`/user-guide/ui-snowsight/notebooks-external-access`를 수행한 후 노트북 세션을 다시 시작합니다.
DB-API를 사용하여 SQL 서버의 데이터를 Snowflake 노트북의 Python 셀로 끌어옵니다.
# Get user name and password from mssql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
def create_sql_server_connection():
import pyodbc
SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"]
UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"]
PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"]
DATABASE = "test_query_history"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={SERVER};"
f"UID={UID};"
f"PWD={PWD};"
f"DATABASE={DATABASE};"
"TrustServerCertificate=yes;"
"Encrypt=yes;"
# Optional to identify source of queries
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_sql_server_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_sql_server_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_sql_server_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_sql_server_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to SQL Server
create connection 함수에 Snowpark의 태그를 포함합니다.
def create_sql_server_connection():
import pyodbc
SERVER = "<your host name>"
PORT = <your port>
UID = "<your user name>"
PWD = "<your password>"
DATABASE = "<your database name>"
connection_str = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={SERVER}:{PORT};"
f"UID={UID};"
f"PWD={PWD};"
f"DATABASE={DATABASE};"
"TrustServerCertificate=yes"
"Encrypt=yes"
# include this parameter for source tracing
"APP=snowflake-snowpark-python;"
)
connection = pyodbc.connect(connection_str)
return connection
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT
s.session_id,
s.program_name,
r.status,
t.text AS sql_text
FROM sys.dm_exec_sessions s
JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
WHERE s.program_name = 'snowflake-snowpark-python';
Oracle
Snowpark에서 Oracle에 연결하려면 다음의 두 패키지가 필요합니다.
다음 코드 예제는 Snowpark 클라이언트, 저장 프로시저, Snowflake 노트북에서 Oracle에 연결하는 방법을 보여줍니다.
DB-API를 사용하여 Snowpark 클라이언트에서 Oracle에 연결하기
다음과 같이 snowflake-snowpark-python[pandas] 및 :code:`oracledb`를 설치합니다.
pip install snowflake-snowpark-python[pandas]
pip install oracledb
다음과 같이 DB-API를 사용하여 Oracle에서 데이터를 가져오고 Oracle에 연결하기 위한 팩토리 메서드를 정의합니다.
def create_oracle_db_connection():
import oracledb
HOST = "<your host>"
PORT = <your port>
SERVICE_NAME = "<your service name>"
USER = "<your user name>"
PASSWORD = "your password"
DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to Oracle from a stored procedure
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET ora_secret
TYPE = PASSWORD
USERNAME = 'ora_username'
PASSWORD = 'ora_password';
-- configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE ora_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('ora_host:ora_port');
-- configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
ALLOWED_NETWORK_RULES = (ora_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
ENABLED = true;
Use the Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'oracledb')
EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
SECRETS = ('cred' = ora_secret )
AS $$
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to Oracle
from snowflake.snowpark import Session
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_ora_dbapi();
Use the DB-API to connect to Oracle from a Snowflake notebook
Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`oracledb`를 선택합니다.
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트로의 송신을 허용하도록 시크릿, 네트워크 규칙 및 EAI를 구성합니다.
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
-- configure a network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
:doc:`/user-guide/ui-snowsight/notebooks-external-access`를 수행한 후 노트북 세션을 다시 시작합니다.
DB-API를 사용하여 Oracle의 데이터를 Snowflake 노트북의 Python 셀로 가져옵니다.
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to Oracle
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = f"{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN,
)
# Optional: include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_oracle_db_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_oracle_db_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_oracle_db_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df_ora.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Oracle
create connection 함수에 Snowpark의 태그를 포함합니다.
def create_oracle_db_connection():
import oracledb
HOST = "myhost"
PORT = "myport"
SERVICE_NAME = "myservice"
USER = "myuser"
PASSWORD = "mypassword"
DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN,
)
# include this parameter for source tracing
connection.clientinfo = "snowflake-snowpark-python"
return connection
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT
s.sid,
s.serial#,
s.username,
s.module,
q.sql_id,
q.sql_text,
q.last_active_time
FROM
v$session s
JOIN v$sql q ON s.sql_id = q.sql_id
WHERE
s.client_info = 'snowflake-snowpark-python'
PostgreSQL
Snowpark에서 PostgreSQL에 연결하려면 다음의 두 패키지가 필요합니다.
다음 코드 예제는 Snowpark 클라이언트, 저장 프로시저, Snowflake 노트북에서 PostgreSQL에 연결하는 방법을 보여줍니다.
DB-API를 사용하여 Snowpark 클라이언트에서 PostgreSQL에 연결하기
psycopg2 설치하기:
다음과 같이 PostgreSQL에 연결하기 위한 팩토리 메서드를 정의합니다.
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user="pg_user",
password="pg_password",
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # Swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to PostgreSQL from a stored procedure
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
-- configure a secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
-- configure a network rule.
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- configure an external access integration.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Use the Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$
# Get user name and password from pg_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to PostgreSQL
from snowflake.snowpark import Session
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_pg_dbapi();
Use the DB-API to connect to PostgreSQL from a Snowflake notebook
Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`psycopg2`를 선택합니다.
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
-- Configure the secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
-- Configure the network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
:doc:`/user-guide/ui-snowsight/notebooks-external-access`를 수행한 후 노트북 세션을 다시 시작합니다.
DB-API를 사용하여 PostgreSQL의 데이터를 Snowflake 노트북의 Python 셀로 가져옵니다.
# Get the user name and password from :code:`pg_secret`
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to PostgreSQL
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_pg_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_pg_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_pg_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_pg_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
# Get the user name and password from :code:`pg_secret`
Source tracing when using the DB-API to connect to PostgreSQL
create connection 함수에 Snowpark의 태그를 포함합니다.
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user="pg_user",
password="pg_password",
# Include this parameter for source tracing
application_name="snowflake-snowpark-python"
)
return connection
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.
SELECT
pid,
usename AS username,
datname AS database,
application_name,
client_addr,
state,
query_start,
query
FROM
pg_stat_activity
WHERE
application_name = 'snowflake-snowpark-python';
MySQL
Snowpark에서 MySQL에 연결하려면 다음의 두 패키지가 필요합니다.
다음 코드 예제는 Snowpark 클라이언트, 저장 프로시저, Snowflake 노트북에서 MySQL에 연결하는 방법을 보여줍니다.
DB-API를 사용하여 Snowpark 클라이언트에서 MySQL에 연결하기
다음과 같이 pymysql을 설치합니다.
pip install snowflake-snowpark-python[pandas]
pip install pymysql
다음과 같이 MySQL에 연결하기 위한 팩토리 메서드를 정의합니다.
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password",
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to MySQL from a stored procedure
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
-- configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
다음과 같이 Snowpark Python DB-API를 사용하여 Python 저장 프로시저의 MySQL에서 데이터를 가져옵니다.
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to MySQL
from snowflake.snowpark import session
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure.
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_mysql_dbapi();
Use the DB-API to connect to MySQL from a Snowflake notebook
Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`pymysql`를 선택합니다.
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
-- configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- configure an EAI
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
:doc:`/user-guide/ui-snowsight/notebooks-external-access`를 수행한 후 노트북 세션을 다시 시작합니다.
DB-API를 사용하여 MySQL의 데이터를 Snowflake 노트북의 Python 셀로 가져옵니다.
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to MySQL
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
# Optional: include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_mysql_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_mysql_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_mysql_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_mysql_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to MySQL
create connection 함수에 Snowpark의 태그를 포함합니다.
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password",
# include this parameter for source tracing
init_command="SET @program_name='snowflake-snowpark-python';"
)
return connection
데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 쿼리를 캡처합니다.
SELECT *
FROM performance_schema.events_statements_history_long
WHERE THREAD_ID = (
SELECT THREAD_ID
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'"
ORDER BY EVENT_ID DESC
LIMIT 1
)
Databricks
Snowpark에서 Databricks에 연결하려면 다음의 두 패키지가 필요합니다.
다음 코드 예제는 Snowpark 클라이언트, 저장 프로시저, Snowflake 노트북에서 Databricks에 연결하는 방법을 보여줍니다.
Use the DB-API to connect to Databricks from a Snowpark client
databricks-sql-connector를 설치합니다.
pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
다음과 같이 Databricks에 연결하기 위한 팩토리 메서드를 정의합니다.
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
Use the DB-API to connect to Databricks from a stored procedure
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
다음과 같이 Snowpark Python DB-API를 사용하여 Python 저장 프로시저의 Databricks에서 데이터를 가져옵니다.
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
SECRETS = ('cred' = dbx_secret )
AS $$
# Get user name and password from dbx_secret
import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
from snowflake.snowpark import Session
# Define the method for creating a connection to Databricks
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname="dbx_host",
http_path="dbx_path",
access_token=ACCESS_TOKEN,
)
return connection
# Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure.
def run(session: Session):
# Feel free to combine local/udtf ingestion and partition column/predicates
# as stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
return df
$$;
CALL sp_dbx_dbapi();
Use the DB-API to connect to Databricks from a Snowflake notebook
Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`databricks-sql-connector`를 선택합니다.
Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 EAI(외부 액세스 통합)를 구성합니다.
참고
:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.
소스 엔드포인트 및 EAI로의 송신을 허용하는 네트워크 규칙인 시크릿을 구성합니다.
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
:doc:`/user-guide/ui-snowsight/notebooks-external-access`를 수행한 후 노트북 세션을 다시 시작합니다.
DB-API를 사용하여 Databricks의 데이터를 Snowflake 노트북의 Python 셀로 가져옵니다.
# Get user name and password from dbx_secret
import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to Databricks
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname="dbx_host",
http_path="dbx_path",
access_token=ACCESS_TOKEN,
)
return connection
# Feel free to combine local/udtf ingestion and partition column/predicates as
# stated in the understanding parallelism section
# Call dbapi to pull data from target table
df = session.read.dbapi(
create_dbx_connection,
table="target_table"
)
# Call dbapi to pull data from target query
df_query = session.read.dbapi(
create_dbx_connection,
query="select * from target_table"
)
# Pull data from target table with parallelism using partition column
df_local_par_column = session.read.dbapi(
create_dbx_connection,
table="target_table",
fetch_size=100000,
num_partitions=4,
column="ID", # swap with the column you want your partition based on
upper_bound=10000,
lower_bound=0
)
udtf_configs = {
"external_access_integration": "<your external access integration>"
}
# Pull data from target table with udtf ingestion with parallelism using predicates
df_udtf_predicates = session.read.dbapi(
create_dbx_connection,
table="target_table",
udtf_configs=udtf_configs,
fetch_size=100000,
predicates=[
"ID < 3",
"ID >= 3"
]
)
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Databricks
create connection 함수에 Snowpark의 태그를 포함합니다.
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN,
# include this parameter for source tracing
user_agent_entry="snowflake-snowpark-python"
)
return connection
DataBricks 콘솔의 쿼리 기록으로 이동하고 소스가 :code:`snowflake-snowpark-python`인 쿼리를 검색합니다.
제한 사항
Snowpark Python DB-API는 Python DB-API 2.0 호환 드라이버(예: pyodbc, oracledb)만 지원합니다. 이 릴리스에서는 JDBC 드라이버가 지원되지 않습니다.