Using the Snowpark Python DB-API

Snowpark Python DB-API를 통해 Snowpark Python 사용자는 외부 데이터베이스에서 Snowflake로 데이터를 프로그래밍 방식으로 가져올 수 있습니다. 다음 내용이 포함됩니다.

  • Python DB-API support: Connect to external databases using Python’s standard DB-API 2.0 drivers.

  • 간소화된 설정: :code:`pip`를 사용하여 추가 종속성을 관리하지 않고도 필요한 드라이버를 설치할 수 있습니다.

이러한 APIs를 사용하면 데이터를 Snowflake 테이블로 원활하게 가져오고, :doc:`Snowpark DataFrames</developer-guide/snowpark/python/working-with-dataframes>`를 이용해 고급 분석용으로 변환할 수 있습니다.

The DB-API can be used in a similar way as the Spark JDBC API. Most parameters are designed to be identical or similar for better parity. At the same time, Snowpark emphasizes a Python-first design with intuitive naming conventions that avoid JDBC-specific configurations. This provides Python developers with a familiar experience. For more information that compares the Snowpark Python DB-API with the Spark JDBC API, see the following table:

DB-API 매개 변수

매개 변수

Snowpark Python DB-API

create_connection

Python DB-API 연결을 만드는 함수입니다.

table

소스 데이터베이스의 테이블을 지정합니다.

query

데이터를 읽기 위해 하위 쿼리로 래핑된 SQL 쿼리입니다.

column

병렬로 읽기 위한 분할 열입니다.

lower_bound

분할의 하한입니다.

upper_bound

분할의 상한입니다.

num_partitions

병렬 처리하기 위한 파티션의 수입니다.

query_timeout

SQL 실행 시간 제한(초)입니다.

fetch_size

왕복당 가져오는 행의 수입니다.

custom_schema

외부 데이터베이스에서 데이터를 가져오기 위한 사용자 지정 스키마입니다.

max_workers

외부 데이터베이스에서 데이터를 병렬로 가져오기 위한 작업자 수입니다.

predicates

WHERE 절 파티션의 조건 목록입니다.

session_init_statement

세션 초기화 시 SQL 또는 PL/SQL 문을 실행합니다.

udtf_configs

Executes the workload using a Snowflake UDTF for better performance.

fetch_merge_count

Number of fetched batches to be merged into a single Parquet file before it is uploaded.

병렬 처리 이해하기

Snowpark Python DB-API has two forms of ingestion mechanism underlying.

로컬 수집

로컬 수집에서 Snowpark는 먼저 외부 소스에서 dbapi() 함수가 호출되는 로컬 환경으로 데이터를 가져온 후 Parquet 파일로 변환합니다. 다음으로, Snowpark는 이러한 Parquet 파일을 임시 Snowflake 스테이지에 업로드하고 스테이지에서 임시 테이블로 복사합니다.

UDTF 수집

UDTF 수집에서 모든 워크로드는 Snowflake 서버에서 실행됩니다. Snowpark가 먼저 UDTF를 생성하고 실행하면, UDTF는 데이터를 Snowflake로 직접 수집하고 임시 테이블에 저장합니다.

Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion.

파티션 열

이 방법은 사용자가 :code:`dbapi()`를 호출할 때 4개의 매개 변수를 기준으로 소스 데이터를 여러 파티션으로 나눕니다.

  • column

  • lower_bound

  • upper_bound

  • num_partitions

이 네 가지 매개 변수는 동시에 설정해야 하며 :code:`column`은 숫자 또는 날짜 유형이어야 합니다.

Predicates

이 방법은 WHERE 절에 포함하기에 적합한 표현식 목록인 매개 변수 조건자를 기준으로 소스 데이터를 파티션으로 나눕니다. 여기서 각 표현식은 파티션을 정의합니다. 조건자는 보다 유연한 파티션 분할 방법을 제공합니다. 예를 들어, 부울 또는 숫자가 아닌 열에서 파티션을 나눌 수 있습니다.

Snowpark Python DB-API를 사용하여 파티션 내에서 병렬 처리 수준을 조정할 수도 있습니다.

Fetch_size

Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are fetched, allowing reading and writing to overlap and maximize throughput.

Snowflake는 위의 수집 방법과 병렬 처리 방법을 결합하여 다음과 같은 네 가지 수집 방법을 제공합니다.

  • 파티션 열을 사용한 로컬 수집

    df_local_par_column = session.read.dbapi(
        create_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    Copy
  • 조건자를 사용한 로컬 수집

    df_local_predicates = session.read.dbapi(
        create_connection,
        table="target_table",
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy
  • 파티션 열을 사용한 UDTF 수집

    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    df_udtf_par_column = session.read.dbapi(
        create_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    Copy
  • 조건자를 사용한 UDTF 수집

    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    df_udtf_predicates = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

SQL server

To connect to SQL Server from Snowpark, you need the following three packages:

The following code examples show how to connect to SQL Server from a Snowpark client and a stored procedure.

Use the DB-API to connect to SQL Server from a Snowpark client

  1. Install the Python SQL Driver:

    /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
    brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
    brew update
    HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
    
    Copy
  2. Install snowflake-snowpark-python[pandas] and pyodbc:

    pip install snowflake-snowpark-python[pandas]
    pip install pyodbc
    
    Copy
  3. Define the factory method for creating a connection to SQL Server:

    def create_sql_server_connection():
        import pyodbc
        SERVER = "<your host name>"
        PORT = <your port>
        UID = "<your user name>"
        PWD = "<your password>"
        DATABASE = "<your database name>"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={SERVER}:{PORT};"
            f"UID={UID};"
            f"PWD={PWD};"
            f"DATABASE={DATABASE};"
            "TrustServerCertificate=yes"
            "Encrypt=yes"
            # Optional to identify source of queries
            "APP=snowflake-snowpark-python;"
        )
        connection = pyodbc.connect(connection_str)
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_sql_server_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_sql_server_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

DB-API를 사용하여 저장 프로시저에서 SQL 서버에 연결하기

  1. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  2. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    -- Configure a secret to allow egress to the source endpoint
    
    CREATE OR REPLACE SECRET mssql_secret
    TYPE = PASSWORD
    USERNAME = 'mssql_username'
    PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
    ALLOWED_NETWORK_RULES = (mssql_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
    ENABLED = true;
    
    Copy
  3. Use the DB-API to pull data from SQL Server in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
        EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
        SECRETS = ('cred' = mssql_secret )
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "<your host>"
        port = <your port>
        username = USER
        password = PASSWORD
        database = "<your database name>"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={host},{port};"
            f"DATABASE={database};"
            f"UID={username};"
            f"PWD={password};"
            "TrustServerCertificate=yes"
            "Encrypt=yes"
            # Optional to identify source of queries
            "APP=snowflake-snowpark-python;"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_sql_server_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_sql_server_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_sql_server_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_sql_server_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
    
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Using DB-API to connect to SQL server from a Snowflake notebook

  1. From Snowflake Notebook packages, select snowflake-snowpark-python and pyodbc.

  2. 왼쪽의 파일 탭에서 environment.yml 파일을 열고 종속성 아래의 다른 항목 뒤에 다음 코드 줄을 추가합니다.

    - msodbcsql18
    
    Copy
  3. Configure the secret, a network rule to allow egress to the source endpoint, and external access integration:

    -- Configure a secret to allow egress to the source endpoint
    
    CREATE OR REPLACE SECRET mssql_secret
    TYPE = PASSWORD
    USERNAME = 'mssql_username'
    PASSWORD = 'mssql_password';
    
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
    ALLOWED_NETWORK_RULES = (mssql_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
    ENABLED = true;
    
    Copy
  4. Snowflake Notebooks 에 대한 외부 액세스 설정, and then restart the notebook session.

  5. Use the DB-API to pull data from SQL Server in a Python cell of a Snowflake notebook:

    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    def create_sql_server_connection():
        import pyodbc
        SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"]
        UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"]
        PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"]
        DATABASE = "test_query_history"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={SERVER};"
            f"UID={UID};"
            f"PWD={PWD};"
            f"DATABASE={DATABASE};"
            "TrustServerCertificate=yes;"
            "Encrypt=yes;"
            # Optional to identify source of queries
            "APP=snowflake-snowpark-python;"
        )
        connection = pyodbc.connect(connection_str)
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_sql_server_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_sql_server_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_sql_server_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to SQL server

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    def create_sql_server_connection():
        import pyodbc
        SERVER = "<your host name>"
        PORT = <your port>
        UID = "<your user name>"
        PWD = "<your password>"
        DATABASE = "<your database name>"
        connection_str = (
            f"DRIVER={{ODBC Driver 18 for SQL Server}};"
            f"SERVER={SERVER}:{PORT};"
            f"UID={UID};"
            f"PWD={PWD};"
            f"DATABASE={DATABASE};"
            "TrustServerCertificate=yes"
            "Encrypt=yes"
            # include this parameter for source tracing
            "APP=snowflake-snowpark-python;"
        )
        connection = pyodbc.connect(connection_str)
        return connection
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT
        s.session_id,
        s.program_name,
        r.status,
        t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Oracle

To connect to Oracle from Snowpark, you need the following two packages:

The following code examples show how to connect to Oracle from a Snowpark client, stored procedures, and a Snowflake notebook.

Use the DB-API to connect to Oracle from a Snowpark client

  1. Install snowflake-snowpark-python[pandas] and oracledb:

    pip install snowflake-snowpark-python[pandas]
    pip install oracledb
    
    Copy
  2. Use the DB-API to pull data from Oracle and define the factory method for creating a connection to Oracle:

    def create_oracle_db_connection():
        import oracledb
        HOST = "<your host>"
        PORT = <your port>
        SERVICE_NAME = "<your service name>"
        USER = "<your user name>"
        PASSWORD = "your password"
        DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        # Optional: include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_oracle_db_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

DB-API를 사용하여 저장 프로시저에서 Oracle에 연결하기

  1. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  2. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    -- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
    
    CREATE OR REPLACE SECRET ora_secret
    TYPE = PASSWORD
    USERNAME = 'ora_username'
    PASSWORD = 'ora_password';
    
    -- configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('ora_host:ora_port');
    
    -- configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
    ALLOWED_NETWORK_RULES = (ora_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
    ENABLED = true;
    
    Copy
  3. Use Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'oracledb')
        EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
        SECRETS = ('cred' = ora_secret )
    AS $$
    
    # Get user name and password from ora_secret
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define the factory method for creating a connection to Oracle
    
    from snowflake.snowpark import Session
    
    def create_oracle_db_connection():
        import oracledb
        host = "ora_host"
        port = "ora_port"
        service_name = "ora_service"
        user = USER
        password = PASSWORD
        DSN = f"{host}:{port}/{service_name}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        # Optional: include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_oracle_db_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_oracle_db_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_oracle_db_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_oracle_db_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    $$;
    
    CALL sp_ora_dbapi();
    
    Copy

Using DB-API to connect to Oracle from a Snowflake notebook

  1. From Snowflake Notebook packages, select snowflake-snowpark-python and oracledb.

  2. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  3. Configure the secret, a network rule, and EAI to allow egress to the source endpoint:

    -- Configure the secret, a network rule to allow egress to the source endpoint, and EAI:
    CREATE OR REPLACE SECRET mysql_secret
        TYPE = PASSWORD
        USERNAME = 'mysql_username'
        PASSWORD = 'mysql_password';
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
    
    -- configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mysql_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('mysql_host:mysql_port');
    
    -- configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
        ALLOWED_NETWORK_RULES = (mysql_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
        ENABLED = true;
    
    Copy
  4. Snowflake Notebooks 에 대한 외부 액세스 설정, and then restart the notebook session.

  5. Use the DB-API to pull data from Oracle in a Python cell of a Snowflake notebook:

    # Get user name and password from ora_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Oracle
    
    def create_oracle_db_connection():
        import oracledb
        host = "ora_host"
        port = "ora_port"
        service_name = "ora_service"
        user = USER
        password = PASSWORD
        DSN = f"{host}:{port}/{service_name}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN,
        )
        # Optional: include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_oracle_db_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_oracle_db_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df_ora.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to Oracle

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = f"{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN,
        )
        # include this parameter for source tracing
        connection.clientinfo = "snowflake-snowpark-python"
        return connection
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT
        s.sid,
        s.serial#,
        s.username,
        s.module,
        q.sql_id,
        q.sql_text,
        q.last_active_time
    FROM
        v$session s
        JOIN v$sql q ON s.sql_id = q.sql_id
    WHERE
        s.client_info = 'snowflake-snowpark-python'
    
    Copy

PostgreSQL

To connect to PostgreSQL from Snowpark, you need the following two packages:

The following code examples show how to connect to PostgreSQL from a Snowpark client, stored procedures, and a Snowflake notebook.

Use the DB-API to connect to PostgreSQL from a Snowpark client

  1. Install psycopg2:

    pip install psycopg2
    
    Copy
  2. Define the factory method for creating a connection to PostgreSQL:

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
            # Optional: include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_pg_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # Swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

DB-API를 사용하여 저장 프로시저에서 PostgreSQL에 연결하기

  1. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  2. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    -- configure a secret
    
    CREATE OR REPLACE SECRET pg_secret
        TYPE = PASSWORD
        USERNAME = 'pg_username'
        PASSWORD = 'pg_password';
    
    -- configure a network rule.
    
    CREATE OR REPLACE NETWORK RULE pg_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('pg_host:pg_port');
    
    -- configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
        ALLOWED_NETWORK_RULES = (pg_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
        ENABLED = true;
    
    Copy
  3. Use Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'psycopg2')
        EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
        SECRETS = ('cred' = pg_secret )
    AS $$
    
    # Get user name and password from pg_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define the factory method for creating a connection to PostgreSQL
    
    from snowflake.snowpark import Session
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    def run(session: Session):
    
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_pg_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_pg_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_pg_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_pg_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    
    $$;
    CALL sp_pg_dbapi();
    
    Copy

Using DB-API to connect to PostgreSQL from a Snowflake notebook

  1. From Snowflake Notebook packages, select snowflake-snowpark-python and psycopg2.

  2. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  3. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    -- Configure the secret
    
    CREATE OR REPLACE SECRET pg_secret
        TYPE = PASSWORD
        USERNAME = 'pg_username'
        PASSWORD = 'pg_password';
    
    ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
    
    -- Configure the network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE pg_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('pg_host:pg_port');
    
    -- Configure external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
        ALLOWED_NETWORK_RULES = (pg_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
        ENABLED = true;
    
    Copy
  4. Snowflake Notebooks 에 대한 외부 액세스 설정, and then restart the notebook session.

  5. Use the DB-API to pull data from PostgreSQL in a Python cell of a Snowflake notebook:

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_pg_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_pg_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    # Get the user name and password from :code:`pg_secret`
    
    Copy

Source tracing when using DB-API to connect to PostgreSQL

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
            # Include this parameter for source tracing
            application_name="snowflake-snowpark-python"
        )
        return connection
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 여전히 라이브 상태인 쿼리를 캡처합니다.

    SELECT
        pid,
        usename AS username,
        datname AS database,
        application_name,
        client_addr,
        state,
        query_start,
        query
    FROM
        pg_stat_activity
    WHERE
        application_name = 'snowflake-snowpark-python';
    
    Copy

MySQL

To connect to MySQL from Snowpark, you need the following two packages:

The following code examples show how to connect to MySQL from a Snowpark client, stored procedures, and a Snowflake notebook.

Use the DB-API to connect to MySQL from a Snowpark client

  1. Install pymysql:

    pip install snowflake-snowpark-python[pandas]
    pip install pymysql
    
    Copy
  2. Define the factory method for creating a connection to MySQL:

    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            database="mysql_db",
            user="mysql_user",
            password="mysql_password",
            # Optional: include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_mysql_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_mysql_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

DB-API를 사용하여 저장 프로시저에서 MySQL에 연결하기

  1. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  2. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    CREATE OR REPLACE SECRET mysql_secret
        TYPE = PASSWORD
        USERNAME = 'mysql_username'
        PASSWORD = 'mysql_password';
    
    -- configure a network rule.
    
    CREATE OR REPLACE NETWORK RULE mysql_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('mysql_host:mysql_port');
    
    -- configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
        ALLOWED_NETWORK_RULES = (mysql_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
            ENABLED = true;
    
    Copy
  3. Use the Snowpark Python DB-API to pull data from MySQL in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'pymysql')
        EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
        SECRETS = ('cred' = mysql_secret )
    AS $$
    
    # Get user name and password from mysql_secret
    
    import _snowflake
        username_password_object = _snowflake.get_username_password('cred')
        USER = username_password_object.username
        PASSWORD = username_password_object.password
    
    # Define the factory method for creating a connection to MySQL
    
    from snowflake.snowpark import session
    
    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            dbname="mysql_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    # Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure.
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_mysql_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_mysql_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_mysql_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_mysql_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    $$;
    
    CALL sp_mysql_dbapi();
    
    Copy

Using DB-API to connect to MySQL from a Snowflake notebook

  1. From Snowflake Notebook packages, select snowflake-snowpark-python and pymysql.

  2. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  3. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    CREATE OR REPLACE SECRET mysql_secret
        TYPE = PASSWORD
        USERNAME = 'mysql_username'
        PASSWORD = 'mysql_password';
    
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
    
    -- configure a network rule.
    CREATE OR REPLACE NETWORK RULE mysql_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('mysql_host:mysql_port');
    
    -- configure an EAI
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
        ALLOWED_NETWORK_RULES = (mysql_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
        ENABLED = true;
    
    Copy
  4. Snowflake Notebooks 에 대한 외부 액세스 설정, and then restart the notebook session.

  5. Use the DB-API to pull data from MySQL in a Python cell of a Snowflake notebook:

    # Get user name and password from mysql_secret
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to MySQL
    
    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            dbname="mysql_dbname",
            user=USER,
            password=PASSWORD,
            # Optional: include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_mysql_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_mysql_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_mysql_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

DB-API를 사용하여 MySQL에 연결할 때의 소스 추적

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    def create_mysql_connection():
        import pymysql
        connection = pymysql.connect(
            host="mysql_host",
            port=mysql_port,
            database="mysql_db",
            user="mysql_user",
            password="mysql_password",
            # include this parameter for source tracing
            init_command="SET @program_name='snowflake-snowpark-python';"
        )
        return connection
    
    Copy
  2. 데이터 소스에서 다음 SQL을 실행하여 Snowpark에서 쿼리를 캡처합니다.

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
        SELECT THREAD_ID
        FROM performance_schema.events_statements_history_long
        WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'"
        ORDER BY EVENT_ID DESC
        LIMIT 1
    )
    
    Copy

Databricks

To connect to Databricks from Snowpark, you need the following two packages:

The following code examples show how to connect to Databricks from a Snowpark client, stored procedures, and a Snowflake notebook.

DB-API를 사용하여 Snowpark 클라이언트에서 Databricks에 연결하기

  1. Install databricks-sql-connector:

    pip install snowflake-snowpark-python[pandas]
    pip install databricks-sql-connector
    
    Copy
  2. Define the factory method for creating a connection to Databricks:

    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname=HOST,
            http_path=PATH,
            access_token=ACCESS_TOKEN
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_dbx_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    Copy

DB-API를 사용하여 저장 프로시저에서 Databricks에 연결하기

  1. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  2. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    CREATE OR REPLACE SECRET dbx_secret
        TYPE = GENERIC_STRING
        SECRET_STRING = 'dbx_access_token';
    
    CREATE OR REPLACE NETWORK RULE dbx_network_rule
        MODE = EGRESS
        TYPE = HOST_PORT
        VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
        ALLOWED_NETWORK_RULES = (dbx_network_rule)
        ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
        ENABLED = true;
    
    Copy
  3. Use the Snowpark Python DB-API to pull data from Databricks in a Python stored procedure:

    CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
        RETURNS TABLE()
        LANGUAGE PYTHON
        RUNTIME_VERSION='3.11'
        HANDLER='run'
        PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
        EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
        SECRETS = ('cred' = dbx_secret )
    AS $$
    
    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    from snowflake.snowpark import Session
    
    # Define the method for creating a connection to Databricks
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure.
    
    def run(session: Session):
        # Feel free to combine local/udtf ingestion and partition column/predicates
        # as stated in the understanding parallelism section
    
        # Call dbapi to pull data from target table
    
        df = session.read.dbapi(
            create_dbx_connection,
            table="target_table"
        )
    
        # Call dbapi to pull data from target query
    
        df_query = session.read.dbapi(
            create_dbx_connection,
            query="select * from target_table"
        )
    
        # Pull data from target table with parallelism using partition column
    
        df_local_par_column = session.read.dbapi(
            create_dbx_connection,
            table="target_table",
            fetch_size=100000,
            num_partitions=4,
            column="ID",  # swap with the column you want your partition based on
            upper_bound=10000,
            lower_bound=0
        )
    
        udtf_configs = {
            "external_access_integration": "<your external access integration>"
        }
    
        # Pull data from target table with udtf ingestion with parallelism using predicates
    
        df_udtf_predicates = session.read.dbapi(
            create_dbx_connection,
            table="target_table",
            udtf_configs=udtf_configs,
            fetch_size=100000,
            predicates=[
                "ID < 3",
                "ID >= 3"
            ]
        )
        return df
    
    $$;
    
    CALL sp_dbx_dbapi();
    
    Copy

Using DB-API to connect to Databricks from a Snowflake notebook

  1. Snowflake Notebook 패키지</user-guide/ui-snowsight/notebooks-import-packages>`에서 :code:`snowflake-snowpark-python 및 :code:`databricks-sql-connector`를 선택합니다.

  2. Configure an external access integration (EAI), which is required to allow Snowflake to connect to the source endpoint.

    참고

    PrivateLink is recommended for secure data transfer, especially when you’re dealing with sensitive information. Ensure that your Snowflake account has the necessary PrivateLink privileges enabled and that the PrivateLink feature is configured and active in your Snowflake Notebook environment.

  3. Configure the secret, a network rule to allow egress to the source endpoint, and EAI:

    CREATE OR REPLACE SECRET dbx_secret
    TYPE = GENERIC_STRING
    SECRET_STRING = 'dbx_access_token';
    
    ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    CREATE OR REPLACE NETWORK RULE dbx_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
    ALLOWED_NETWORK_RULES = (dbx_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
    ENABLED = true;
    
    Copy
  4. Snowflake Notebooks 에 대한 외부 액세스 설정, and then restart the notebook session.

  5. Use the DB-API to pull data from Databricks in a Python cell of a Snowflake notebook:

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # Feel free to combine local/udtf ingestion and partition column/predicates as
    # stated in the understanding parallelism section
    
    # Call dbapi to pull data from target table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="target_table"
    )
    
    # Call dbapi to pull data from target query
    
    df_query = session.read.dbapi(
        create_dbx_connection,
        query="select * from target_table"
    )
    
    # Pull data from target table with parallelism using partition column
    
    df_local_par_column = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        fetch_size=100000,
        num_partitions=4,
        column="ID",  # swap with the column you want your partition based on
        upper_bound=10000,
        lower_bound=0
    )
    
    udtf_configs = {
        "external_access_integration": "<your external access integration>"
    }
    
    # Pull data from target table with udtf ingestion with parallelism using predicates
    
    df_udtf_predicates = session.read.dbapi(
        create_dbx_connection,
        table="target_table",
        udtf_configs=udtf_configs,
        fetch_size=100000,
        predicates=[
            "ID < 3",
            "ID >= 3"
        ]
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Source tracing when using DB-API to connect to Databricks

  1. create connection 함수에 Snowpark의 태그를 포함합니다.

    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname=HOST,
            http_path=PATH,
            access_token=ACCESS_TOKEN,
            # include this parameter for source tracing
            user_agent_entry="snowflake-snowpark-python"
        )
        return connection
    
    Copy
  2. DataBricks 콘솔의 쿼리 기록으로 이동하고 소스가 :code:`snowflake-snowpark-python`인 쿼리를 검색합니다.

제한 사항

The Snowpark Python DB-API supports only Python DB-API 2.0–compliant drivers (for example, pyodbc or oracledb). JDBC drivers are not supported in this release.