Snowpark Pythonを使用した外部データソースからのデータの読み取り DB-API¶
Snowpark Python DB-APIを使用すると、Snowpark Pythonユーザーは、外部データベースからSnowflakeにデータをプログラムでプルできます。説明には次が含まれます。
** Python DB-API サポート:** Pythonの標準の DB-API2.0ドライバーを使用した外部データベースに接続します。
合理化された設定:
pip
を使用して必要なドライバーをインストールするため、追加の依存関係を管理する必要がありません。
これらの APIsのデータをシームレスに Snowpark DataFrames テーブルに取り込み、Snowflakeを使用して高度な分析のための変換することができます。
Snowpark Pythonの使用 DB-API¶
その `DB-API<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.dbapi>`_ は `Spark JDBCAPI<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_ と同様の方法で使用できます。ほとんどのパラメーターは、より安全性を高めるために、同一または類似するように設計されています。同時に、Snowparkは直感的な命名規則を持つPythonファーストの設計を強調し、 JDBC固有の構成を回避します。このため、 Python開発者は慣れ親しんだエクスペリエンスを提供します。Snowpark Pythonを DB-APISpark で JDBCAPIの比較する詳細情報は DB-APIパラメーター: を参照してください。
DB-APIパラメーター:¶
パラメーター |
Snowpark DB-API |
---|---|
|
PythonをDB-API 接続を作成する関数 。 |
|
ソースデータベースのテーブルを指定します。 |
|
SQL クエリは、データを読み取るためのサブクエリとしてラップされました。 |
|
並列読み取りのパーティション列。 |
|
パーティション分割の下限。 |
|
パーティション分割の上限。 |
|
並列処理のパーティションの数。 |
|
SQL 実行のタイムアウト(秒単位)。 |
|
ラウンドトリップごとに取得された行数。 |
|
外部データベースからデータをプルするためのカスタムスキーマ。 |
|
外部データベースからのデータの並行フェッチおよびプルのワーカー数。 |
|
WHERE 句のパーティションの条件リスト。 |
|
セッション初期化時の SQL または PL/SQL ステートメントを実行します。 |
|
パフォーマンスを向上させるための UDTF Snowflake を使用してワークロードを実行する。 |
|
アップロードする前に単一のParquetファイルにマージするフェッチされたバッチの数。 |
並列処理の理解¶
Snowpark Python DB-API ユーザー入力に基づいて、2つの独立した形式の並列処理を使用します。
パーティションベースの並列処理
ユーザーがパーティション情報(例:
column
,lower_bound
,upper_bound
,num_partitions
)または 述語を使用すると、Snowflakeはクエリを複数のパーティションに分割します。これらの は、マルチプロセッシングを使用して並列処理され、各ワーカーは独自のパーティションを取得して書き込みます。各パーティション内のフェッチサイズベースの並列処理
パーティション内では、 API は :code:`fetch_size`で定義されたチャンクの行をフェッチします。これらの行は、フェッチされるときに並列にSnowflakeに書き込まれます。読み取りと書き込みにより重複が発生し、スループットが最大化されます。
これらの2つの形式の並列処理は独立して動作します。パーティション分割も fetch_size
が指定されている場合、関数はSnowflakeに書き込む前に、ソーステーブル全体をメモリにロードします。そのため、メモリ使用量が増加し、大規模なデータセットのパフォーマンスが低下する可能性があります。
SQL Server¶
DB-API を使用して、Snowparkクライアントから SQL サーバーに接続します¶
SnowparkのSQL サーバーに接続するには 、次の3つのパッケージが必要です。
Snowpark: Snowflake-snowpark-python[pandas]
SQL サーバー ODBC ドライバー: Microsoft ODBC ドライバー SQL サーバー用。ドライバーをインストールすることにより、Microsoftの EULA に同意するものとします。
オープンソースpyodbcライブラリ。 Pyodbc
以下は、 SQLSnowparkクライアントおよびストアドプロシージャからのサーバーに接続するために必要なコード例です。
Python SQL ドライバーをインストールする
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)" brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
snowflake-snowpark-python[pandas]
およびpyodbc
をインストールpip install snowflake-snowpark-python[pandas] pip install pyodbc
SQL サーバーへの接続を作成するファクトリメソッドを定義する
def create_sql_server_connection(): import pyodbc HOST = "mssql_host" PORT = "mssql_port" USERNAME = "mssql_username" PASSWORD = "mssql_password" DATABASE = "mssql_db" connection_str = ( "DRIVER={{ODBC Driver 18 for SQL Server}};" "SERVER={HOST},{PORT};" "DATABASE={DATABASE};" "UID={USERNAME};" "PWD={PASSWORD};" ) connection = pyodbc.connect(connection_str) return connection # Call dbapi to pull data from mssql_table df = session.read.dbapi( create_sql_server_connection, table="mssql_table")
DB-API を使用して、ストアドプロシージャから SQL サーバーに接続します¶
外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレット、ソースエンドポイントへのイグレスを許可するネットワークルール、および外部アクセス統合を構成します。
CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true; -- Create or replace a Python stored procedure CREATE OR REPLACE PROCEDURE sp_mssql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql') EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration) SECRETS = ('cred' = mssql_secret ) AS $$ # Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define a method to connect to SQL server_hostname from snowflake.snowpark import Session def create_sql_server_connection(): import pyodbc host = "mssql_host" port = mssql_port username = USER password = PASSWORD database = "mssql_database" connection_str = ( "DRIVER={{ODBC Driver 18 for SQL Server}};" "SERVER={host},{port};" "DATABASE={database};" "UID={username};" "PWD={password};" ) connection = pyodbc.connect(connection_str) return connection def run(session: Session): df = session.read.dbapi( create_sql_server_connection, table="mssql_table" ) return df $$; CALL sp_mssql_dbapi();
Oracle¶
SnowparkからOracleに接続するには、次の2つのパッケージが必要です。
Snowpark: Snowflake-snowpark-python[pandas]
オープンソースのoracleedbライブラリ。 Oracledb
以下は、Snowparkクライアント、ストアドプロシージャ、および Snowflake Notebooks からOracleに接続するために必要なコード例です。
DB-API を使用して、SnowparkクライアントからOracleに接続します¶
snowflake-snowpark-python[pandas]
およびoracledb
をインストールpip install snowflake-snowpark-python[pandas] pip install oradb
Oracleからデータを取得するには、DB-API を使用し、Oracleへの接続を作成するためのファクトリメソッドを定義します
def create_oracle_db_connection(): import oracledb HOST = "myhost" PORT = "myport" SERVICE_NAME = "myservice" USER = "myuser" PASSWORD = "mypassword" DSN = "{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) return connection # Call dbapi to pull data from mytable df = session.read.dbapi( create_oracle_db_connection, table="mytable")
DB-API を使用しようして、ストアドプロシージャからOracleに接続します¶
Snowflakeがソースエンドポイントに接続できるようにするには、外部アクセス統合が必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。
-- Configure the secret, a network rule to allow egress to the source endpoint and external access integration. CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Snowpark Python DB-API を使用して、PythonストアドプロシージャでOracleからデータを取得します¶
CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'oracledb')
EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
SECRETS = ('cred' = ora_secret )
AS $$
# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to Oracle
from snowflake.snowpark import Session
def create_oracle_db_connection():
import oracledb
host = "ora_host"
port = "ora_port"
service_name = "ora_service"
user = USER
password = PASSWORD
DSN = "{host}:{port}/{service_name}"
connection = oracledb.connect(
user=USER,
password=PASSWORD,
dsn=DSN
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_ora_connection,
table="ora_table"
)
return df
$$;
CALL sp_ora_dbapi();
DB-APIを使用して、Snowflake NotebookからOracleに接続します¶
ノートブックパッケージから
snowflake-snowpark-python
およびoracledb
を選択してください。シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。
CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret); -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- Configure an external access integration. CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。
Snowpark Python DB-API を使用して、Snowflake NotebookのPythonセルでOracleからデータを取得します¶
# Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Oracle def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = "{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) return connection # Use dbapi to read data from ora_table df_ora = session.read.dbapi( create_oracle_db_connection, table='ora_table' ) # Save data into sf_table df_ora.write.mode("overwrite").save_as_table('sf_table')
PostgreSQL¶
PostgreSQL に接続するには、Snowparkからは、次の2つのパッケージが必要です:
Snowpark: Snowflake-snowpark-python[pandas]
オープンソースのpyscopeg2ライブラリ。 pyairports
以下は、Snowparkクライアント、ストアドプロシージャ、およびSnowflake Notebooksから PostgreSQL に接続するために必要なコード例です。
DB-API を使用して、Snowparkクライアントから PostgreSQL に接続します¶
psycopg2
をインストールするpip install psycopg2
PostgreSQLへの接続を作成するファクトリメソッドを定義します
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", ) return connection # Call dbapi to pull data from pg_table df = session.read.dbapi( create_pg_connection, table="pg_table")
DB-API を使用して、ストアドプロシージャから PostgreSQL に接続します¶
外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
-- Configure a network rule.
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure an external access integration.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Snowpark Python DB-API を使用して、Pythonストアドプロシージャ内で PostgreSQL からデータを取得します
CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$
# Get user name and password from pg_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to PostgreSQL
from snowflake.snowpark import Session
def create_pg_connection():
import psycopg2
connection = psycopg2.connect(
host="pg_host",
port=pg_port,
dbname="pg_dbname",
user=USER,
password=PASSWORD,
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_pg_connection,
table="pg_table"
)
return df
$$;
CALL sp_pg_dbapi();
DB-API を使用して、Snowflake Notebookから PostgreSQL に接続します¶
Snowflake Notebookパッケージ から
snowflake-snowpark-python
およびpsycopg2
選択してください外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。
-- Configure the secret
CREATE OR REPLACE SECRET pg_secret
TYPE = PASSWORD
USERNAME = 'pg_username'
PASSWORD = 'pg_password';
ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);
-- Configure the network rule to allow egress to the source endpoint
CREATE OR REPLACE NETWORK RULE pg_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('pg_host:pg_port');
-- Configure external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
ALLOWED_NETWORK_RULES = (pg_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。
# Get the user name and password from :code:`pg_secret` import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to PostgreSQL def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, ) return connection # Use dbapi to read and save data from pg_table df = session.read.dbapi( create_pg_connection, table="pg_table" ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
MySQL¶
MySQL に接続するには、Snowparkからは、次の2つのパッケージが必要です:
Snowpark: Snowflake-snowpark-python[pandas]
オープンソースのpymysqlライブラリ:PyMySQL
以下は、Snowparkクライアント、ストアドプロシージャ、およびSnowflake NotebookからMySQL に接続するために必要なコード例です。
DB-API を使用して、Snowparkクライアントから MySQL に接続します¶
pymysqlをインストールする
pip install snowflake-snowpark-python[pandas] pip install pymysql
MySQLへの接続を作成するファクトリメソッドを定義します
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
database="mysql_db",
user="mysql_user",
password="mysql_password"
)
return connection
# Call dbapi to pull data from mysql_table
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table"
)
DB-API を使用して、ストアドプロシージャから MySQL に接続します¶
外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
-- Configure a network rule.
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Snowpark Python DB-API を使用して、Pythonストアドプロシージャ内で MySQL からデータを取得します
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password
# Define the factory method for creating a connection to MySQL
from snowflake.snowpark import session
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
)
return connection
def run(session: Session):
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table"
)
return df
$$;
CALL sp_mysql_dbapi();
DB-API を使用して、Snowflake Notebookから MySQL に接続します¶
snowflake-snowpark-python
およびpymysql
を Snowflake Notebook パッケージから選択します。外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを構成し、Snowflake Notebook に追加します。
CREATE OR REPLACE SECRET mysql_secret
TYPE = PASSWORD
USERNAME = 'mysql_username'
PASSWORD = 'mysql_password';
ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
ネットワークルールと外部アクセス統合を構成します。
CREATE OR REPLACE NETWORK RULE mysql_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('mysql_host:mysql_port');
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
ALLOWED_NETWORK_RULES = (mysql_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password
import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()
# Define the factory method for creating a connection to MySQL
def create_mysql_connection():
import pymysql
connection = pymysql.connect(
host="mysql_host",
port=mysql_port,
dbname="mysql_dbname",
user=USER,
password=PASSWORD,
)
return connection
# Call dbapi to pull data from mysql_table
df = session.read.dbapi(
create_mysql_connection,
table="mysql_table")
# Save data into sf_table
df.write.mode("overwrite").save_as_table('sf_table')
Databricks¶
SnowparkからDatabricksに接続するには、次の2つのパッケージが必要です。
Snowpark: Snowflake-snowpark-python[pandas]
オープンソースのpyscopeg2ライブラリ: databricks-sql-connector
以下は、Snowparkクライアント、ストアドプロシージャ、Snowflake NotebookからDatabricksに接続するために必要なコード例です。
DB-APIを使用して、SnowparkクライアントからDatabricksに接続します¶
databricks-sql-connector
をインストール:
pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Databricksへの接続を作成するファクトリメソッドを定義する
def create_dbx_connection():
import databricks.sql
connection = databricks.sql.connect(
server_hostname=HOST,
http_path=PATH,
access_token=ACCESS_TOKEN
)
return connection
#Call dbapi to pull data from mytable
df = session.read.dbapi(
create_dbx_connection,
table="dbx_table")
DB-API を使用して、ストアドプロシージャからDatabricksに接続します¶
Snowflake Notebookパッケージ から
snowflake-snowpark-python
およびpymysql
選択してくださいSnowflakeがソースエンドポイントに接続できるようにするには、外部アクセス統合が必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを設定し、ソースエンドポイントへのエグレスと外部アクセス統合を許可するネットワークルールを構成します。
CREATE OR REPLACE SECRET dbx_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'dbx_access_token';
-- Configure a network rule
CREATE OR REPLACE NETWORK RULE dbx_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dbx_host:dbx_port');
-- Configure an external access integration
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
ALLOWED_NETWORK_RULES = (dbx_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
ENABLED = true;
Snowpark Python DB-API を使用して、PythonストアドプロシージャでDatabricksからデータを取得します
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector') EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration) SECRETS = ('cred' = dbx_secret ) AS $$ # Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') from snowflake.snowpark import Session # define the method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection def run(session: Session): df = session.read.dbapi( create_dbx_connection, table="dbx_table" ) return df $$; CALL sp_dbx_dbapi();
DB-API を使用して、Snowflake NotebookからDatabricksに接続します¶
Snowflake Notebookパッケージ から
snowflake-snowpark-python
およびpymysql
選択してください外部アクセス統合を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な があることを確認します。 PrivateLink 権限が有効になり、 PrivateLink 機能はSnowflake Notebook環境で構成され、アクティブになっています。
シークレットを構成し、Snowflake Notebook に追加します。
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
構成
CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 ノートブックセッションを再起動します。
# Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # use dbapi to read data from dbx_table df = session.read.dbapi( create_dbx_connection, table="dbx_table" ) # save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
制限事項¶
ドライバー¶
Snowpark Python DB-API はPython DB-API2.0準拠のドライバー(例: pyodbc
, Oracledb
)のみをサポートしています。 JDBC ドライバーはこのリリースではサポートされていません。