Snowpark Python DB-API를 사용하여 외부 데이터 소스에서 데이터 읽기

Snowpark Python DB-API를 통해 Snowpark Python 사용자는 외부 데이터베이스에서 Snowflake로 데이터를 프로그래밍 방식으로 가져올 수 있습니다. 다음 내용이 포함됩니다.

  • Python DB-API지원: Python의 표준 DB-API 2.0 드라이버를 사용하여 외부 데이터베이스에 연결합니다.

  • 간소화된 설정: :code:`pip`를 사용하여 추가 종속성을 관리하지 않고도 필요한 드라이버를 설치할 수 있습니다.

이러한 APIs를 사용하면 데이터를 Snowflake 테이블로 원활하게 가져오고, :doc:`Snowpark DataFrames</developer-guide/snowpark/python/working-with-dataframes>`를 이용해 고급 분석용으로 변환할 수 있습니다.

Snowpark Python DB-API 사용하기

DB-API<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.dbapi>`_는 `Spark JDBCAPI<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_와 유사한 방식으로 사용할 있습니다. 대부분의 매개 변수는 나은 패리티를 얻기 위해 동일하거나 유사하게 설계되었습니다. 동시에 Snowpark는 JDBC 전용 구성을 지양하고 직관적인 명명 규칙을 사용하는 Python 우선 설계를 강조합니다. 이를 통해 Python 개발자에게 익숙한 환경을 제공합니다. Spark JDBC API와 Snowpark Python DB-API를 비교하는 자세한 내용은 :ref:`label-DB-API-vs-spark 섹션을 참조하세요.

DB-API 매개 변수

매개 변수

Snowpark DB-API

create_connection

Python DB-API 연결을 만드는 함수입니다.

table

소스 데이터베이스의 테이블을 지정합니다.

query

데이터를 읽기 위해 하위 쿼리로 래핑된 SQL 쿼리입니다.

column

병렬로 읽기 위한 분할 열입니다.

lower_bound

분할의 하한입니다.

upper_bound

분할의 상한입니다.

num_partitions

병렬 처리하기 위한 파티션의 수입니다.

query_timeout

SQL 실행 시간 제한(초)입니다.

fetch_size

왕복당 가져오는 행의 수입니다.

custom_schema

외부 데이터베이스에서 데이터를 가져오기 위한 사용자 지정 스키마입니다.

max_workers

외부 데이터베이스에서 데이터를 병렬로 가져오기 위한 작업자 수입니다.

predicates

WHERE 절 파티션의 조건 목록입니다.

session_init_statement

세션 초기화 시 SQL 또는 PL/SQL 문을 실행합니다.

udtf_configs

더 나은 성능을 얻기 위해 Snowflake UDTF를 사용하여 워크로드 실행합니다.

fetch_merge_count

업로드하기 전에 단일 Parquet 파일로 병합할 가져온 배치의 수입니다.

병렬 처리 이해하기

Snowpark Python DB-API는 사용자 입력 정보를 기반으로 두 가지 독립적인 병렬 처리 형식을 사용합니다.

  • 파티션 기반 병렬 처리

    사용자가 분할 정보(예: column, lower_bound, upper_bound, num_partitions) 또는 조건자를 지정하면 Snowflake에서 쿼리를 여러 파티션으로 분할합니다. 이러한 작업은 다중 처리를 사용해 병렬로 처리되며, 각 작업자가 자체 파티션을 가져오고 작성합니다.

  • 각 파티션 내 가져오기 크기 기반의 병렬 처리

    API가 파티션 내에서 :code:`fetch_size`로 정의된 청크의 행을 가져옵니다. 이러한 행은 가져올 때 병렬로 Snowflake에 작성되므로 읽기와 쓰기가 겹치고 처리량이 극대화될 수 있습니다.

이 두 가지 병렬 처리 방식은 독립적으로 작동합니다. 분할 또는 :code:`fetch_size`가 지정되지 않은 경우, 이 함수는 전체 소스 테이블을 메모리에 로드한 후 Snowflake에 작성합니다. 이로 인해 대규모 데이터 세트의 경우 메모리 사용량이 증가하고 성능이 저하될 수 있습니다.

SQL 서버

DB-API를 사용하여 Snowpark 클라이언트에서 SQL 서버에 연결하기

Snowpark에서 SQL 서버에 연결하려면 다음의 세 가지 패키지가 필요합니다.

아래는 Snowpark 클라이언트 및 저장 프로시저에서 SQL에 연결하는 데 필요한 코드 예제입니다.

  • Python SQL 드라이버 설치하기

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
  brew update
  HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Copy
  • snowflake-snowpark-python[pandas]pyodbc 설치하기

pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Copy
  • SQL 서버에 연결하기 위한 팩토리 메서드 정의하기

def create_sql_server_connection():
    import pyodbc
    HOST = "mssql_host"
    PORT = "mssql_port"
    USERNAME = "mssql_username"
    PASSWORD = "mssql_password"
    DATABASE = "mssql_db"
    connection_str = (
        "DRIVER={{ODBC Driver 18 for SQL Server}};"
        "SERVER={HOST},{PORT};"
        "DATABASE={DATABASE};"
        "UID={USERNAME};"
        "PWD={PASSWORD};"
    )
    connection = pyodbc.connect(connection_str)
    return connection


# Call dbapi to pull data from mssql_table

df = session.read.dbapi(
  create_sql_server_connection,
    table="mssql_table")
Copy

DB-API를 사용하여 저장 프로시저에서 SQL 서버에 연결하기

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 외부 액세스 통합을 구성합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙, 외부 액세스 통합을 구성합니다.

    CREATE OR REPLACE SECRET mssql_secret
      TYPE = PASSWORD
      USERNAME = 'mssql_username'
      PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
      ALLOWED_NETWORK_RULES = (mssql_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
      ENABLED = true;
    
    -- Create or replace a Python stored procedure
    
    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
      RETURNS TABLE()
      LANGUAGE PYTHON
      RUNTIME_VERSION='3.11'
      HANDLER='run'
      PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
            EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
            SECRETS = ('cred' = mssql_secret )
    
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "mssql_host"
        port = mssql_port
        username = USER
        password = PASSWORD
        database = "mssql_database"
        connection_str = (
          "DRIVER={{ODBC Driver 18 for SQL Server}};"
          "SERVER={host},{port};"
          "DATABASE={database};"
          "UID={username};"
          "PWD={password};"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        df = session.read.dbapi(
            create_sql_server_connection,
            table="mssql_table"
        )
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Oracle

Snowpark에서 Oracle에 연결하려면 다음의 두 패키지가 필요합니다.

다음은 Snowpark 클라이언트, 저장 프로시저 및 |sf-notebooks|에서 Oracle에 연결하는 데 필요한 코드 예제입니다.

DB-API를 사용하여 Snowpark 클라이언트에서 Oracle에 연결하기

  • snowflake-snowpark-python[pandas]oracledb 설치하기

    pip install snowflake-snowpark-python[pandas]
    pip install oradb
    
    Copy
  • DB-API를 사용하여 Oracle에서 데이터를 가져오고 Oracle에 연결하기 위한 팩토리 메서드 정의하기

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = "{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        return connection
    
    
    # Call dbapi to pull data from mytable
    
    df = session.read.dbapi(
      create_oracle_db_connection,
      table="mytable")
    
    Copy

DB-API를 사용하여 저장 프로시저에서 Oracle에 연결하기

Snowflake를 소스 엔드포인트에 연결할 수 있도록 하려면 외부 액세스 통합이 필요합니다.

참고

:ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

    -- Configure the secret, a network rule to allow egress to the source endpoint and external access integration.
    
    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy

Snowpark Python DB-API를 사용하여 Python 저장 프로시저의 Oracle에서 데이터 가져오기

CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'oracledb')
  EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
  SECRETS = ('cred' = ora_secret )
AS $$

# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to Oracle

from snowflake.snowpark import Session

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_ora_connection,
        table="ora_table"
    )
    return df
$$;

CALL sp_ora_dbapi();
Copy

DB-API를 사용하여 Snowflake Notebook에서 Oracle에 연결하기

  • Notebook 패키지에서 snowflake-snowpark-python 및 :code:`oracledb`를 선택합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy
  • :doc:`/user-guide/ui-snowsight/notebooks-external-access`를 클릭한 후 Notebook 세션을 다시 시작합니다.

Snowpark Python DB-API를 사용하여 Snowflake Notebook Python 셀의 Oracle에서 데이터 가져오기

# Get user name and password from ora_secret

import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to Oracle

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

# Use dbapi to read data from ora_table

df_ora = session.read.dbapi(
  create_oracle_db_connection,
  table='ora_table'
)

# Save data into sf_table

df_ora.write.mode("overwrite").save_as_table('sf_table')
Copy

PostgreSQL

Snowpark에서 PostgreSQL에 연결하려면 다음의 두 패키지가 필요합니다.

다음은 Snowpark 클라이언트, 저장 프로시저, Snowflake Notebooks에서 PostgreSQL에 연결하는 데 필요한 코드 예제입니다.

DB-API를 사용하여 Snowpark 클라이언트에서 PostgreSQL에 연결하기

  • psycopg2 설치하기

    pip install psycopg2
    
    Copy
  • PostgreSQL에 연결하기 위한 팩토리 메서드 정의하기

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
        )
        return connection
    
    
    # Call dbapi to pull data from pg_table
    
    df = session.read.dbapi(
      create_pg_connection,
      table="pg_table")
    
    Copy

DB-API를 사용하여 저장 프로시저에서 PostgreSQL에 연결하기

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 외부 액세스 통합을 구성합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE pg_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pg_host:pg_port');

-- Configure an external access integration.

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
  ALLOWED_NETWORK_RULES = (pg_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
  ENABLED = true;
Copy
  • Snowpark Python DB-API를 사용하여 Python 저장 프로시저의 PostgreSQL에서 데이터 가져오기

CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$

# Get user name and password from pg_secret

import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to PostgreSQL

from snowflake.snowpark import Session

def create_pg_connection():
    import psycopg2
    connection = psycopg2.connect(
        host="pg_host",
        port=pg_port,
        dbname="pg_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    return df

$$;
CALL sp_pg_dbapi();
Copy

DB-API를 사용하여 Snowflake Notebook에서 PostgreSQL에 연결하기

  • Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`psycopg2`를 선택합니다.

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 외부 액세스 통합을 구성합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

-- Configure the secret

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';
  ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);

 -- Configure the network rule to allow egress to the source endpoint

 CREATE OR REPLACE NETWORK RULE pg_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pg_host:pg_port');

 -- Configure external access integration

 CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
    ALLOWED_NETWORK_RULES = (pg_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
    ENABLED = true;
Copy
  • :doc:`/user-guide/ui-snowsight/notebooks-external-access`를 클릭한 후 Notebook 세션을 다시 시작합니다.

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
        )
        return connection
    
    # Use dbapi to read and save data from pg_table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

MySQL

Snowpark에서 MySQL에 연결하려면 다음의 두 패키지가 필요합니다.

다음은 Snowpark 클라이언트, 저장 프로시저, Snowflake Notebook에서 MySQL에 연결하는 데 필요한 코드 예제입니다.

DB-API를 사용하여 Snowpark 클라이언트에서 MySQL에 연결하기

  • pymysql 설치하기

pip install snowflake-snowpark-python[pandas]
pip install pymysql
Copy
  • MySQL에 연결하기 위한 팩토리 메서드 정의하기

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        database="mysql_db",
        user="mysql_user",
        password="mysql_password"
    )
    return connection


# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table"
)
Copy

DB-API를 사용하여 저장 프로시저에서 MySQL에 연결하기

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 외부 액세스 통합을 구성합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

-- Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • Snowpark Python DB-API를 사용하여 Python 저장 프로시저의 MySQL에서 데이터를 가져옵니다.

CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$

# Get user name and password from mysql_secret

import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password

# Define the factory method for creating a connection to MySQL

from snowflake.snowpark import session

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_mysql_connection,
        table="mysql_table"
    )
    return df
$$;

CALL sp_mysql_dbapi();
Copy

DB-API를 사용하여 Snowflake Notebook에서 MySQL에 연결하기

  • Snowflake Notebook 패키지에서 snowflake-snowpark-python 및 :code:`pymysql`을 선택합니다.

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 외부 액세스 통합을 구성합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿을 구성하고 :doc:`Snowflake Notebook</user-guide/ui-snowsight/notebooks>`에 추가합니다.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

  ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Copy
  • 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • :doc:`/user-guide/ui-snowsight/notebooks-external-access`를 클릭한 후 Notebook 세션을 다시 시작합니다.

# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table")

# Save data into sf_table

df.write.mode("overwrite").save_as_table('sf_table')
Copy

Databricks

Snowpark에서 Databricks에 연결하려면 다음의 두 패키지가 필요합니다.

다음은 Snowpark 클라이언트, 저장 프로시저 및 Snowflake Notebook에서 Databricks에 연결하는 데 필요한 코드 예제입니다.

DB-API를 사용하여 Snowpark 클라이언트에서 Databricks에 연결하기

  • databricks-sql-connector 설치하기:

pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Copy
  • Databricks에 연결하기 위한 팩토리 메서드 정의하기

def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname=HOST,
        http_path=PATH,
        access_token=ACCESS_TOKEN
    )
    return connection


#Call dbapi to pull data from mytable

df = session.read.dbapi(
    create_dbx_connection,
    table="dbx_table")
Copy

DB-API를 사용하여 저장 프로시저에서 Databricks에 연결하기

  • Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`pymysql`를 선택합니다.

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하려면 외부 액세스 통합이 필요합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿, 소스 엔드포인트로의 송신을 허용하는 네트워크 규칙 및 외부 액세스 통합을 구성합니다.

CREATE OR REPLACE SECRET dbx_secret
  TYPE = GENERIC_STRING
  SECRET_STRING = 'dbx_access_token';

-- Configure a network rule

CREATE OR REPLACE NETWORK RULE dbx_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('dbx_host:dbx_port');

--  Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
  ALLOWED_NETWORK_RULES = (dbx_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
  ENABLED = true;
Copy
  • Snowpark Python DB-API를 사용하여 Python 저장 프로시저의 Databricks에서 데이터 가져오기

CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
  EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
  SECRETS = ('cred' = dbx_secret )
AS $$

# Get user name and password from dbx_secret

import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')

from snowflake.snowpark import Session

# define the method for creating a connection to Databricks
def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname="dbx_host",
        http_path="dbx_path",
        access_token=ACCESS_TOKEN,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    return df

$$;

CALL sp_dbx_dbapi();
Copy

DB-API를 사용하여 Snowflake Notebook에서 Databricks에 연결하기

  • Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`pymysql`를 선택합니다.

  • Snowflake를 소스 엔드포인트에 연결할 수 있도록 하는 데 필요한 외부 액세스 통합을 구성합니다.

    참고

    :ref:`PrivateLink<label-aws_privatelink_connect>`는 특히 민감한 정보를 처리할 때 데이터를 안전하게 전송하기 위해 권장됩니다. Snowflake 계정에 필요한 PrivateLink 권한이 활성화되어 있는지, PrivateLink 기능이 구성되고 Snowflake Notebook 환경에 활성화되어 있는지 확인합니다.

  • 시크릿을 구성하고 :doc:`Snowflake Notebook</user-guide/ui-snowsight/notebooks>`에 추가합니다.

    CREATE OR REPLACE SECRET dbx_secret
      TYPE = GENERIC_STRING
      SECRET_STRING = 'dbx_access_token';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    Copy
  • 구성

    CREATE OR REPLACE NETWORK RULE dbx_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
      ALLOWED_NETWORK_RULES = (dbx_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
      ENABLED = true;
    
    Copy
  • :doc:`/user-guide/ui-snowsight/notebooks-external-access`를 클릭한 후 Notebook 세션을 다시 시작합니다.

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # use dbapi to read data from dbx_table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    
    # save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

제한 사항

드라이버

Snowpark Python DB-API는 Python DB-API 2.0 호환 드라이버(예: pyodbc, oracledb)만 지원합니다. 이 릴리스에서는 JDBC 드라이버를 지원하지 않습니다.