Snowpark Pythonを使用した外部データソースからのデータの読み取り DB-API

Snowpark Python DB-APIを使用すると、Snowpark Pythonユーザーは、外部データベースからSnowflakeにデータをプログラムでプルできます。説明には次が含まれます。

  • ** Python DB-API サポート:** Pythonの標準の DB-API2.0ドライバーを使用した外部データベースに接続します。

  • 合理化された設定: pip を使用して必要なドライバーをインストールするため、追加の依存関係を管理する必要がありません。

これらの APIsのデータをシームレスに Snowpark DataFrames テーブルに取り込み、Snowflakeを使用して高度な分析のための変換することができます。

Snowpark Pythonの使用 DB-API

その `DB-API<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.dbapi>`_`Spark JDBCAPI<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_ と同様の方法で使用できます。ほとんどのパラメーターは、より安全性を高めるために、同一または類似するように設計されています。同時に、Snowparkは直感的な命名規則を持つPythonファーストの設計を強調し、 JDBC固有の構成を回避します。このため、 Python開発者は慣れ親しんだエクスペリエンスを提供します。Snowpark Pythonを DB-APISpark で JDBCAPIの比較する詳細情報は DB-APIパラメーター: を参照してください。

DB-APIパラメーター:

パラメーター

Snowpark DB-API

create_connection

PythonをDB-API 接続を作成する関数 。

table

ソースデータベースのテーブルを指定します。

query

SQL クエリは、データを読み取るためのサブクエリとしてラップされました。

column

並列読み取りのパーティション列。

lower_bound

パーティション分割の下限。

upper_bound

パーティション分割の上限。

num_partitions

並列処理のパーティションの数。

query_timeout

SQL 実行のタイムアウト(秒単位)。

fetch_size

ラウンドトリップごとに取得された行数。

custom_schema

外部データベースからデータをプルするためのカスタムスキーマ。

max_workers

外部データベースからのデータの並行フェッチおよびプルのワーカー数。

predicates

WHERE 句のパーティションの条件リスト。

session_init_statement

セッション初期化時の SQL または PL/SQL ステートメントを実行します。

udtf_configs

パフォーマンスを向上させるための UDTF Snowflake を使用してワークロードを実行する。

fetch_merge_count

アップロードする前に単一のParquetファイルにマージするフェッチされたバッチの数。

並列処理の理解

Snowpark Python DB-API ユーザー入力に基づいて、2つの独立した形式の並列処理を使用します。

  • パーティションベースの並列処理

    ユーザーがパーティション情報(例: column, lower_bound, upper_bound, num_partitions)または 述語を使用すると、Snowflakeはクエリを複数のパーティションに分割します。これらの は、マルチプロセッシングを使用して並列処理され、各ワーカーは独自のパーティションを取得して書き込みます。

  • 各パーティション内のフェッチサイズベースの並列処理

    パーティション内では、 API は :code:`fetch_size`で定義されたチャンクの行をフェッチします。これらの行は、フェッチされるときに並列にSnowflakeに書き込まれます。読み取りと書き込みにより重複が発生し、スループットが最大化されます。

これらの2つの形式の並列処理は独立して動作します。パーティション分割も fetch_size が指定されている場合、関数はSnowflakeに書き込む前に、ソーステーブル全体をメモリにロードします。そのため、メモリ使用量が増加し、大規模なデータセットのパフォーマンスが低下する可能性があります。

SQL Server

DB-API を使用して、Snowparkクライアントから SQL サーバーに接続します

SnowparkのSQL サーバーに接続するには 、次の3つのパッケージが必要です。

以下は、 SQLSnowparkクライアントおよびストアドプロシージャからのサーバーに接続するために必要なコード例です。

  • Python SQL ドライバーをインストールする

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
  brew update
  HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Copy
  • snowflake-snowpark-python[pandas] および pyodbc をインストール

pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Copy
  • SQL サーバーへの接続を作成するファクトリメソッドを定義する

def create_sql_server_connection():
    import pyodbc
    HOST = "mssql_host"
    PORT = "mssql_port"
    USERNAME = "mssql_username"
    PASSWORD = "mssql_password"
    DATABASE = "mssql_db"
    connection_str = (
        "DRIVER={{ODBC Driver 18 for SQL Server}};"
        "SERVER={HOST},{PORT};"
        "DATABASE={DATABASE};"
        "UID={USERNAME};"
        "PWD={PASSWORD};"
    )
    connection = pyodbc.connect(connection_str)
    return connection


# Call dbapi to pull data from mssql_table

df = session.read.dbapi(
  create_sql_server_connection,
    table="mssql_table")
Copy

DB-API を使用して、ストアドプロシージャから SQL サーバーに接続します

  • 外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレット、ソースエンドポイントへのイグレスを許可するネットワークルール、および外部アクセス統合を構成します。

    CREATE OR REPLACE SECRET mssql_secret
      TYPE = PASSWORD
      USERNAME = 'mssql_username'
      PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
      ALLOWED_NETWORK_RULES = (mssql_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
      ENABLED = true;
    
    -- Create or replace a Python stored procedure
    
    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
      RETURNS TABLE()
      LANGUAGE PYTHON
      RUNTIME_VERSION='3.11'
      HANDLER='run'
      PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
            EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
            SECRETS = ('cred' = mssql_secret )
    
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "mssql_host"
        port = mssql_port
        username = USER
        password = PASSWORD
        database = "mssql_database"
        connection_str = (
          "DRIVER={{ODBC Driver 18 for SQL Server}};"
          "SERVER={host},{port};"
          "DATABASE={database};"
          "UID={username};"
          "PWD={password};"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        df = session.read.dbapi(
            create_sql_server_connection,
            table="mssql_table"
        )
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Oracle

SnowparkからOracleに接続するには、次の2つのパッケージが必要です。

以下は、Snowparkクライアント、ストアドプロシージャ、および Snowflake Notebooks からOracleに接続するために必要なコード例です。

DB-API を使用して、SnowparkクライアントからOracleに接続します

  • snowflake-snowpark-python[pandas] および oracledb をインストール

    pip install snowflake-snowpark-python[pandas]
    pip install oradb
    
    Copy
  • Oracleからデータを取得するには、DB-API を使用し、Oracleへの接続を作成するためのファクトリメソッドを定義します

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = "{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        return connection
    
    
    # Call dbapi to pull data from mytable
    
    df = session.read.dbapi(
      create_oracle_db_connection,
      table="mytable")
    
    Copy

DB-API を使用しようして、ストアドプロシージャからOracleに接続します

Snowflakeがソースエンドポイントに接続できるようにするには、外部アクセス統合が必要です。

注釈

PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。

    -- Configure the secret, a network rule to allow egress to the source endpoint and external access integration.
    
    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy

Snowpark Python DB-API を使用して、PythonストアドプロシージャでOracleからデータを取得します

CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'oracledb')
  EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
  SECRETS = ('cred' = ora_secret )
AS $$

# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to Oracle

from snowflake.snowpark import Session

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_ora_connection,
        table="ora_table"
    )
    return df
$$;

CALL sp_ora_dbapi();
Copy

DB-APIを使用して、Snowflake NotebookからOracleに接続します

  • ノートブックパッケージから snowflake-snowpark-python および oracledb を選択してください。

  • シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。

    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy
  • Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。

Snowpark Python DB-API を使用して、Snowflake NotebookのPythonセルでOracleからデータを取得します

# Get user name and password from ora_secret

import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to Oracle

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

# Use dbapi to read data from ora_table

df_ora = session.read.dbapi(
  create_oracle_db_connection,
  table='ora_table'
)

# Save data into sf_table

df_ora.write.mode("overwrite").save_as_table('sf_table')
Copy

PostgreSQL

PostgreSQL に接続するには、Snowparkからは、次の2つのパッケージが必要です:

以下は、Snowparkクライアント、ストアドプロシージャ、およびSnowflake Notebooksから PostgreSQL に接続するために必要なコード例です。

DB-API を使用して、Snowparkクライアントから PostgreSQL に接続します

  • psycopg2 をインストールする

    pip install psycopg2
    
    Copy
  • PostgreSQLへの接続を作成するファクトリメソッドを定義します

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
        )
        return connection
    
    
    # Call dbapi to pull data from pg_table
    
    df = session.read.dbapi(
      create_pg_connection,
      table="pg_table")
    
    Copy

DB-API を使用して、ストアドプロシージャから PostgreSQL に接続します

  • 外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE pg_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pg_host:pg_port');

-- Configure an external access integration.

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
  ALLOWED_NETWORK_RULES = (pg_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
  ENABLED = true;
Copy
  • Snowpark Python DB-API を使用して、Pythonストアドプロシージャ内で PostgreSQL からデータを取得します

CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$

# Get user name and password from pg_secret

import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to PostgreSQL

from snowflake.snowpark import Session

def create_pg_connection():
    import psycopg2
    connection = psycopg2.connect(
        host="pg_host",
        port=pg_port,
        dbname="pg_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    return df

$$;
CALL sp_pg_dbapi();
Copy

DB-API を使用して、Snowflake Notebookから PostgreSQL に接続します

  • Snowflake Notebookパッケージ から snowflake-snowpark-python および psycopg2 選択してください

  • 外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。

-- Configure the secret

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';
  ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);

 -- Configure the network rule to allow egress to the source endpoint

 CREATE OR REPLACE NETWORK RULE pg_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pg_host:pg_port');

 -- Configure external access integration

 CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
    ALLOWED_NETWORK_RULES = (pg_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
    ENABLED = true;
Copy
  • Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
        )
        return connection
    
    # Use dbapi to read and save data from pg_table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

MySQL

MySQL に接続するには、Snowparkからは、次の2つのパッケージが必要です:

以下は、Snowparkクライアント、ストアドプロシージャ、およびSnowflake NotebookからMySQL に接続するために必要なコード例です。

DB-API を使用して、Snowparkクライアントから MySQL に接続します

  • pymysqlをインストールする

pip install snowflake-snowpark-python[pandas]
pip install pymysql
Copy
  • MySQLへの接続を作成するファクトリメソッドを定義します

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        database="mysql_db",
        user="mysql_user",
        password="mysql_password"
    )
    return connection


# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table"
)
Copy

DB-API を使用して、ストアドプロシージャから MySQL に接続します

  • 外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

-- Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • Snowpark Python DB-API を使用して、Pythonストアドプロシージャ内で MySQL からデータを取得します

CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$

# Get user name and password from mysql_secret

import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password

# Define the factory method for creating a connection to MySQL

from snowflake.snowpark import session

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_mysql_connection,
        table="mysql_table"
    )
    return df
$$;

CALL sp_mysql_dbapi();
Copy

DB-API を使用して、Snowflake Notebookから MySQL に接続します

  • snowflake-snowpark-python および pymysqlSnowflake Notebook パッケージから選択します。

  • 外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを構成し、Snowflake Notebook に追加します。

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

  ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Copy
  • ネットワークルールと外部アクセス統合を構成します。

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table")

# Save data into sf_table

df.write.mode("overwrite").save_as_table('sf_table')
Copy

Databricks

SnowparkからDatabricksに接続するには、次の2つのパッケージが必要です。

以下は、Snowparkクライアント、ストアドプロシージャ、Snowflake NotebookからDatabricksに接続するために必要なコード例です。

DB-APIを使用して、SnowparkクライアントからDatabricksに接続します

  • databricks-sql-connector をインストール:

pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Copy
  • Databricksへの接続を作成するファクトリメソッドを定義する

def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname=HOST,
        http_path=PATH,
        access_token=ACCESS_TOKEN
    )
    return connection


#Call dbapi to pull data from mytable

df = session.read.dbapi(
    create_dbx_connection,
    table="dbx_table")
Copy

DB-API を使用して、ストアドプロシージャからDatabricksに接続します

  • Snowflake Notebookパッケージ から snowflake-snowpark-python および pymysql 選択してください

  • Snowflakeがソースエンドポイントに接続できるようにするには、外部アクセス統合が必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。

CREATE OR REPLACE SECRET dbx_secret
  TYPE = GENERIC_STRING
  SECRET_STRING = 'dbx_access_token';

-- Configure a network rule

CREATE OR REPLACE NETWORK RULE dbx_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('dbx_host:dbx_port');

--  Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
  ALLOWED_NETWORK_RULES = (dbx_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
  ENABLED = true;
Copy
  • Snowpark Python DB-API を使用して、PythonストアドプロシージャでDatabricksからデータを取得します

CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
  EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
  SECRETS = ('cred' = dbx_secret )
AS $$

# Get user name and password from dbx_secret

import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')

from snowflake.snowpark import Session

# define the method for creating a connection to Databricks
def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname="dbx_host",
        http_path="dbx_path",
        access_token=ACCESS_TOKEN,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    return df

$$;

CALL sp_dbx_dbapi();
Copy

DB-API を使用して、Snowflake NotebookからDatabricksに接続します

  • Snowflake Notebookパッケージ から snowflake-snowpark-python および pymysql 選択してください

  • 外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。

    注釈

    PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。

  • シークレットを構成し、Snowflake Notebook に追加します。

    CREATE OR REPLACE SECRET dbx_secret
      TYPE = GENERIC_STRING
      SECRET_STRING = 'dbx_access_token';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    Copy
  • 構成

    CREATE OR REPLACE NETWORK RULE dbx_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
      ALLOWED_NETWORK_RULES = (dbx_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
      ENABLED = true;
    
    Copy
  • Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # use dbapi to read data from dbx_table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    
    # save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

制限事項

ドライバー

Snowpark Python DB-API はPython DB-API2.0準拠のドライバー(例: pyodbc, Oracledb)のみをサポートしています。 JDBC ドライバーはこのリリースではサポートされていません。