Snowpark Python DB-APIの使用¶
With the Snowpark Python DB-API, Snowpark Python users can programmatically pull data from external databases into Snowflake. The DB-API includes:
Python DB-APIサポート: Pythonの標準の DB-API 2.0ドライバーを使用して外部データベースに接続します。
合理化された設定:
pipを使用して必要なドライバーをインストールするため、追加の依存関係を管理する必要がありません。
これらの APIsのデータをシームレスに Snowpark DataFrames テーブルに取り込み、Snowflakeを使用して高度な分析のための変換することができます。
その `DB-API<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.dbapi>`_ は `Spark JDBCAPI<https://spark.apache.org/docs/3.5.4/sql-data-sources-jdbc.html>`_ と同様の方法で使用できます。ほとんどのパラメーターは、より安全性を高めるために、同一または類似するように設計されています。同時に、Snowparkは直感的な命名規則を持つPythonファーストの設計を強調し、 JDBC固有の構成を回避します。このため、 Python開発者は慣れ親しんだエクスペリエンスを提供します。Snowpark Python DB-APIと Spark JDBC API の比較については、以下の表を参照してください。
DB-APIパラメーター:¶
パラメーター |
Snowpark Python DB-API |
|---|---|
|
Function to create a Python DB-API connection |
|
Specifies the table in the source database |
|
SQL query wrapped as a subquery for reading data |
|
Partitioning column for parallel reads |
|
Lower bound for partitioning |
|
Upper bound for partitioning |
|
Number of partitions for parallelism |
|
Timeout for SQL execution (in seconds) |
|
Number of rows fetched per round trip |
|
Custom schema for pulling data from external databases |
|
Number of workers for parallel fetching and pulling data from external databases |
|
List of conditions for WHERE clause partitions |
|
Executes a SQL or PL/SQL statement upon session initialization |
|
Executes the workload using a Snowflake UDTF for better performance |
|
Number of fetched batches to be merged into a single Parquet file before it is uploaded |
並列処理の理解¶
The Snowpark Python DB-API has two underlying forms of ingestion mechanisms:
- ローカルインジェスチョン
In local ingestion, Snowpark first fetches data from external sources to your local environment, where the
dbapi()function is called and converts them to Parquet files. Next, Snowpark uploads these Parquet files to a temporary Snowflake stage and copies them into a temporary table from the stage.- UDTF インジェスチョン
UDTF インジェスチョンでは、すべてのワークロードがSnowflakeサーバー上で実行されます。Snowparkは最初に UDTF を作成し、実行します。 UDTF はデータをSnowflakeに直接インジェストし、仮テーブルに格納します。
The Snowpark Python DB-API also has two ways to parallelize and accelerate ingestion:
- パーティション列
This method divides source data into multiple partitions based on four parameters when users call
dbapi():columnlower_boundupper_boundnum_partitions
これらの4つのパラメーターは同時に設定する必要があります。
columnは、数値または日付型である必要があります。- 述語
This method divides source data into partitions based on parameter predicates, which are a list of expressions suitable for inclusion in
WHEREclauses, where each expression defines a partition. Predicates provide a more flexible way of dividing partitions; for example, you can divide partitions on Boolean or non-numeric columns.
The Snowpark Python DB-API also allows the adjustment of parallelism level within a partition:
- Fetch_size
Within a partition, the API fetches rows in chunks defined by fetch_size. These rows are written to Snowflake in parallel as they are fetched, which allows reading and writing to overlap and maximizes throughput.
By combining the listed methods of ingestion and parallelism, Snowflake has four ways of ingestion:
パーティション列を使用したローカルインジェスチョン
df_local_par_column = session.read.dbapi( create_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 )
述語を使用したローカルインジェスチョン
df_local_predicates = session.read.dbapi( create_connection, table="target_table", fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
パーティション列を使用した UDTF インジェスチョン
udtf_configs = { "external_access_integration": "<your external access integration>" } df_udtf_par_column = session.read.dbapi( create_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 )
述語を使用した UDTF インジェスチョン
udtf_configs = { "external_access_integration": "<your external access integration>" } df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
SQL Server¶
Snowparkの SQLサーバーに接続するには 、次の3つのパッケージが必要です。
Snowpark:Snowflake-snowpark-python[pandas]
SQL サーバー ODBC ドライバー:Microsoft ODBC Driver for SQL Server
ドライバーをインストールすることにより、Microsoftの EULA に同意するものとします。
オープンソースpyodbcライブラリ。Pyodbc
以下のコード例は、Snowparkクライアントとストアドプロシージャから SQLサーバーに接続する方法を示しています。
DB-APIを使用して、Snowparkクライアントから SQLサーバーに接続する¶
Python SQL Driverをインストールします。
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)" brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release brew update HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql mssql-tools
snowflake-snowpark-python[pandas]およびpyodbcをインストールします。pip install snowflake-snowpark-python[pandas] pip install pyodbc
SQLサーバーへの接続を作成するファクトリーメソッドを定義します。
def create_sql_server_connection(): import pyodbc SERVER = "<your host name>" PORT = <your port> UID = "<your user name>" PWD = "<your password>" DATABASE = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER}:{PORT};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes" "Encrypt=yes" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Use the DB-API to connect to SQL Server from a stored procedure¶
外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true;
DB-APIを使って、Pythonストアドプロシージャで SQL Serverからデータを取り出します。
CREATE OR REPLACE PROCEDURE sp_mssql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql') EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration) SECRETS = ('cred' = mssql_secret ) AS $$ # Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define a method to connect to SQL Server_hostname from snowflake.snowpark import Session def create_sql_server_connection(): import pyodbc host = "<your host>" port = <your port> username = USER password = PASSWORD database = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={host},{port};" f"DATABASE={database};" f"UID={username};" f"PWD={password};" "TrustServerCertificate=yes" "Encrypt=yes" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_mssql_dbapi();
Use the DB-API to connect to SQL Server from a Snowflake notebook¶
Snowflake Notebookパッケージ で
snowflake-snowpark-pythonおよびpyodbcを選択します。In the Files pane, open the file
environment.yml, and under Dependencies, add the following line of code after other entries:- msodbcsql
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET mssql_secret TYPE = PASSWORD USERNAME = 'mssql_username' PASSWORD = 'mssql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mssql_secret); -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mssql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mssql_host:mssql_port'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration ALLOWED_NETWORK_RULES = (mssql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 、ノートブックセッションを再起動します。
DB-API を使用して、Snowflake NotebookのPythonセルに SQL Server からデータを取り込みます。
# Get user name and password from mssql_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() def create_sql_server_connection(): import pyodbc SERVER = SQL_SERVER_CONNECTION_PARAMETERS["SERVER"] UID = SQL_SERVER_CONNECTION_PARAMETERS["UID"] PWD = SQL_SERVER_CONNECTION_PARAMETERS["PWD"] DATABASE = "test_query_history" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes;" "Encrypt=yes;" # Optional to identify source of queries "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_sql_server_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_sql_server_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_sql_server_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_sql_server_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to SQL Server¶
Create接続関数にSnowparkのタグを含めます。
def create_sql_server_connection(): import pyodbc SERVER = "<your host name>" PORT = <your port> UID = "<your user name>" PWD = "<your password>" DATABASE = "<your database name>" connection_str = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={SERVER}:{PORT};" f"UID={UID};" f"PWD={PWD};" f"DATABASE={DATABASE};" "TrustServerCertificate=yes" "Encrypt=yes" # include this parameter for source tracing "APP=snowflake-snowpark-python;" ) connection = pyodbc.connect(connection_str) return connection
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Oracle¶
SnowparkからOracleに接続するには、次の2つのパッケージが必要です。
Snowpark:Snowflake-snowpark-python[pandas]
オープンソースのoracleedbライブラリ。Oracledb
以下のコード例では、Snowparkクライアント、ストアドプロシージャ、Snowflake NotebookからOracleに接続する方法を示します。
DB-APIを使用して、SnowparkクライアントからOracleに接続する¶
snowflake-snowpark-python[pandas]およびoracledbをインストールします。pip install snowflake-snowpark-python[pandas] pip install oracledb
Oracleからデータを取得するには、 DB-APIを使用し、Oracleへの接続を作成するためのファクトリーメソッドを定義します
def create_oracle_db_connection(): import oracledb HOST = "<your host>" PORT = <your port> SERVICE_NAME = "<your service name>" USER = "<your user name>" PASSWORD = "your password" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Use the DB-API to connect to Oracle from a stored procedure¶
外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI: CREATE OR REPLACE SECRET ora_secret TYPE = PASSWORD USERNAME = 'ora_username' PASSWORD = 'ora_password'; -- configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE ora_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('ora_host:ora_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration ALLOWED_NETWORK_RULES = (ora_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (ora_secret) ENABLED = true;
Use the Snowpark Python DB-API to pull data from Oracle in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_ora_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'oracledb') EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration) SECRETS = ('cred' = ora_secret ) AS $$ # Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to Oracle from snowflake.snowpark import Session def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_ora_dbapi();
Use the DB-API to connect to Oracle from a Snowflake notebook¶
Snowflake Notebookパッケージ で
snowflake-snowpark-pythonおよびoracledbを選択します。外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、ネットワークルール、ソースエンドポイントへのエグレスを許可する EAI を構成します。
-- Configure the secret, a network rule to allow egress to the source endpoint, and EAI: CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret); -- configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 、ノートブックセッションを再起動します。
DB-API を使用して、Snowflake NotebookのPythonセルにOracleからデータを取り込みます。
# Get user name and password from ora_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Oracle def create_oracle_db_connection(): import oracledb host = "ora_host" port = "ora_port" service_name = "ora_service" user = USER password = PASSWORD DSN = f"{host}:{port}/{service_name}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN, ) # Optional: include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_oracle_db_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_oracle_db_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_oracle_db_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_oracle_db_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df_ora.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Oracle¶
Create接続関数にSnowparkのタグを含めます。
def create_oracle_db_connection(): import oracledb HOST = "myhost" PORT = "myport" SERVICE_NAME = "myservice" USER = "myuser" PASSWORD = "mypassword" DSN = f"{HOST}:{PORT}/{SERVICE_NAME}" connection = oracledb.connect( user=USER, password=PASSWORD, dsn=DSN, ) # include this parameter for source tracing connection.clientinfo = "snowflake-snowpark-python" return connection
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT s.sid, s.serial#, s.username, s.module, q.sql_id, q.sql_text, q.last_active_time FROM v$session s JOIN v$sql q ON s.sql_id = q.sql_id WHERE s.client_info = 'snowflake-snowpark-python'
PostgreSQL¶
PostgreSQLに接続するには、Snowparkからは、次の2つのパッケージが必要です。
Snowpark:Snowflake-snowpark-python[pandas]
オープンソースのpyscopeg2ライブラリ。pyairports
以下のコード例では、Snowparkクライアント、ストアドプロシージャ、Snowflake Snowflake Notebookから PostgreSQLに接続する方法を示します。
DB-APIを使用して、Snowparkクライアントから PostgreSQLに接続する¶
psycopg2をインストール:pip install psycopg2
PostgreSQLへの接続を作成するファクトリーメソッドを定義します。
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # Swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Use the DB-API to connect to PostgreSQL from a stored procedure¶
外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
-- configure a secret CREATE OR REPLACE SECRET pg_secret TYPE = PASSWORD USERNAME = 'pg_username' PASSWORD = 'pg_password'; -- configure a network rule. CREATE OR REPLACE NETWORK RULE pg_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pg_host:pg_port'); -- configure an external access integration. CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration ALLOWED_NETWORK_RULES = (pg_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (pg_secret) ENABLED = true;
Use the Snowpark Python DB-API to pull data from PostgreSQL in a Python stored procedure:
CREATE OR REPLACE PROCEDURE sp_pg_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'psycopg2') EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration) SECRETS = ('cred' = pg_secret ) AS $$ # Get user name and password from pg_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to PostgreSQL from snowflake.snowpark import Session def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_pg_dbapi();
Use the DB-API to connect to PostgreSQL from a Snowflake notebook¶
Snowflake Notebookパッケージ で
snowflake-snowpark-pythonおよびpsycopg2を選択します。外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
-- Configure the secret CREATE OR REPLACE SECRET pg_secret TYPE = PASSWORD USERNAME = 'pg_username' PASSWORD = 'pg_password'; ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret); -- Configure the network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE pg_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('pg_host:pg_port'); -- Configure external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration ALLOWED_NETWORK_RULES = (pg_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (pg_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 、ノートブックセッションを再起動します。
DB-API を使用して、Snowflake NotebookのPythonセルに PostgreSQL からデータを取り込みます。
# Get the user name and password from :code:`pg_secret` import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to PostgreSQL def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_pg_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_pg_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_pg_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_pg_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table') # Get the user name and password from :code:`pg_secret`
Source tracing when using the DB-API to connect to PostgreSQL¶
Create接続関数にSnowparkのタグを含めます。
def create_pg_connection(): import psycopg2 connection = psycopg2.connect( host="pg_host", port=pg_port, dbname="pg_dbname", user="pg_user", password="pg_password", # Include this parameter for source tracing application_name="snowflake-snowpark-python" ) return connection
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
MySQL¶
MySQLに接続するには、Snowparkからは、次の2つのパッケージが必要です。
Snowpark:Snowflake-snowpark-python[pandas]
オープンソースのpymysqlライブラリ:PyMySQL
以下のコード例では、Snowparkクライアント、ストアドプロシージャ、Snowflake Notebookから MySQLに接続する方法を示します。
DB-APIを使用して、Snowparkクライアントから MySQLに接続する¶
pymysqlをインストールします。
pip install snowflake-snowpark-python[pandas] pip install pymysql
MySQLへの接続を作成するファクトリーメソッドを定義します。
def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, database="mysql_db", user="mysql_user", password="mysql_password", # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Use the DB-API to connect to MySQL from a stored procedure¶
外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; -- configure a network rule. CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Snowpark Python DB-APIを使用して、Pythonストアドプロシージャ内で MySQLからデータを取得します。
CREATE OR REPLACE PROCEDURE sp_mysql_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'pymysql') EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration) SECRETS = ('cred' = mysql_secret ) AS $$ # Get user name and password from mysql_secret import _snowflake username_password_object = _snowflake.get_username_password('cred') USER = username_password_object.username PASSWORD = username_password_object.password # Define the factory method for creating a connection to MySQL from snowflake.snowpark import session def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, dbname="mysql_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Using Snowpark Python DB-API to pull data from MySQL in a Python stored procedure. def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_mysql_dbapi();
Use the DB-API to connect to MySQL from a Snowflake notebook¶
Snowflake Notebookパッケージ で
snowflake-snowpark-pythonおよびpymysqlを選択します。外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
CREATE OR REPLACE SECRET mysql_secret TYPE = PASSWORD USERNAME = 'mysql_username' PASSWORD = 'mysql_password'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret); -- configure a network rule. CREATE OR REPLACE NETWORK RULE mysql_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('mysql_host:mysql_port'); -- configure an EAI CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration ALLOWED_NETWORK_RULES = (mysql_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 、ノートブックセッションを再起動します。
DB-API を使用して、Snowflake NotebookのPythonセルに MySQL からデータを取り込みます。
# Get user name and password from mysql_secret import _snowflake username_password_object = _snowflake.get_username_password('snowflake-secret-object') USER = username_password_object.username PASSWORD = username_password_object.password import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to MySQL def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, dbname="mysql_dbname", user=USER, password=PASSWORD, # Optional: include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_mysql_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_mysql_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_mysql_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_mysql_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to MySQL¶
Create接続関数にSnowparkのタグを含めます。
def create_mysql_connection(): import pymysql connection = pymysql.connect( host="mysql_host", port=mysql_port, database="mysql_db", user="mysql_user", password="mysql_password", # include this parameter for source tracing init_command="SET @program_name='snowflake-snowpark-python';" ) return connection
データソースで以下の SQL を実行し、Snowparkからのクエリをキャプチャします。
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID FROM performance_schema.events_statements_history_long WHERE SQL_TEXT = "SET @program_name='snowflake-snowpark-python'" ORDER BY EVENT_ID DESC LIMIT 1 )
Databricks¶
SnowparkからDatabricksに接続するには、次の2つのパッケージが必要です。
Snowpark:Snowflake-snowpark-python[pandas]
オープンソースのpyscopeg2ライブラリ: databricks-sql-connector
以下のコード例では、Snowparkクライアント、ストアドプロシージャ、Snowflake NotebookからDatabricksに接続する方法を示します。
Use the DB-API to connect to Databricks from a Snowpark client¶
databricks-sql-connectorをインストールします。
pip install snowflake-snowpark-python[pandas] pip install databricks-sql-connector
Databricksへの接続を作成するファクトリーメソッドを定義します。
def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname=HOST, http_path=PATH, access_token=ACCESS_TOKEN ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] )
Use the DB-API to connect to Databricks from a stored procedure¶
外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Snowpark Python DB-APIを使用して、PythonストアドプロシージャでDatabricksからデータを取得します。
CREATE OR REPLACE PROCEDURE sp_dbx_dbapi() RETURNS TABLE() LANGUAGE PYTHON RUNTIME_VERSION='3.11' HANDLER='run' PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector') EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration) SECRETS = ('cred' = dbx_secret ) AS $$ # Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') from snowflake.snowpark import Session # Define the method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # Using Snowpark Python DB-API to pull data from DataBricks in a Python stored procedure. def run(session: Session): # Feel free to combine local/udtf ingestion and partition column/predicates # as stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) return df $$; CALL sp_dbx_dbapi();
Use the DB-API to connect to Databricks from a Snowflake notebook¶
Snowflake Notebookパッケージ で
snowflake-snowpark-pythonおよびdatabricks-sql-connectorを選択します。外部アクセス統合(EAI)を構成します。これは、Snowflakeがソースエンドポイントに接続できるようにするために必要です。
注釈
PrivateLink は、特に機密情報を扱う場合に、安全にデータを転送することをお勧めします。Snowflakeアカウントに必要な PrivateLink権限が有効になり、 PrivateLink機能がSnowflake Notebook環境で構成され、アクティブになっていることを確認します。
シークレット、送信元エンドポイントへのエグレスを許可するネットワークルール、 EAIを構成します。
CREATE OR REPLACE SECRET dbx_secret TYPE = GENERIC_STRING SECRET_STRING = 'dbx_access_token'; ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret); CREATE OR REPLACE NETWORK RULE dbx_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('dbx_host:dbx_port'); CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration ALLOWED_NETWORK_RULES = (dbx_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret) ENABLED = true;
Snowflake Notebooks の外部アクセスの設定 、ノートブックセッションを再起動します。
DB-API を使用して、Snowflake NotebookのPythonセルにDatabricksからデータを取り込みます。
# Get user name and password from dbx_secret import _snowflake ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred') import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() # Define the factory method for creating a connection to Databricks def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname="dbx_host", http_path="dbx_path", access_token=ACCESS_TOKEN, ) return connection # Feel free to combine local/udtf ingestion and partition column/predicates as # stated in the understanding parallelism section # Call dbapi to pull data from target table df = session.read.dbapi( create_dbx_connection, table="target_table" ) # Call dbapi to pull data from target query df_query = session.read.dbapi( create_dbx_connection, query="select * from target_table" ) # Pull data from target table with parallelism using partition column df_local_par_column = session.read.dbapi( create_dbx_connection, table="target_table", fetch_size=100000, num_partitions=4, column="ID", # swap with the column you want your partition based on upper_bound=10000, lower_bound=0 ) udtf_configs = { "external_access_integration": "<your external access integration>" } # Pull data from target table with udtf ingestion with parallelism using predicates df_udtf_predicates = session.read.dbapi( create_dbx_connection, table="target_table", udtf_configs=udtf_configs, fetch_size=100000, predicates=[ "ID < 3", "ID >= 3" ] ) # Save data into sf_table df.write.mode("overwrite").save_as_table('sf_table')
Source tracing when using the DB-API to connect to Databricks¶
Create接続関数にSnowparkのタグを含めます。
def create_dbx_connection(): import databricks.sql connection = databricks.sql.connect( server_hostname=HOST, http_path=PATH, access_token=ACCESS_TOKEN, # include this parameter for source tracing user_agent_entry="snowflake-snowpark-python" ) return connection
DataBricks コンソールでクエリ履歴に移動し、ソースが
snowflake-snowpark-pythonであるクエリを検索します。
制限事項¶
Snowpark Python DB-APIは、Python DB-API 2.0準拠のドライバー(例: pyodbc または Oracledb )のみをサポートしています。 JDBCドライバーはこのリリースではサポートされていません。