Snowpark Python JDBC の使用¶
Snowpark Python JDBC を使用すると、Snowpark Pythonユーザーは、外部データベースからSnowflakeにデータをプログラムでプルできます。これにより、 JDBC ドライバーを使用して外部データベースに接続することができます。
これらの APIsのデータをシームレスに Snowpark DataFrames テーブルに取り込み、Snowflakeを使用して高度な分析のための変換することができます。
`JDBC-<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc>`_ は、Spark JDBC API と同様の方法で使用できます。ほとんどのパラメーターは、より安全性を高めるために、同一または類似するように設計されています。Snowpark Python JDBC とSpark JDBC API の比較については、以下の表を参照してください。
Snowpark JDBC パラメーター¶
パラメーター |
Snowpark Python JDBC |
|---|---|
|
JDBC ドライバーを介して外部データソースに接続するために使用される接続文字列。 |
|
UDTF 作成に必要な構成を含むディクショナリ。 |
|
JDBC 接続確立中に必要なキーと値のペアを含むディクショナリ |
|
ソースデータベースのテーブル |
|
SQL クエリは、データを読み取るためのサブクエリとしてラップされました |
|
並列読み取りのパーティション列 |
|
パーティション分割の下限 |
|
パーティション分割の上限 |
|
並列処理のパーティションの数 |
|
SQL 実行のタイムアウト期間、秒単位で測定。 |
|
ラウンドトリップごとに取得された行数 |
|
外部データベースからデータをプルするためのカスタムスキーマ |
|
WHERE 句のパーティションの条件リスト |
|
セッション初期化時の SQL または PL/SQL ステートメントを実行します |
並列処理の理解¶
Snowpark Python JDBC には現在、基礎となるインジェスチョンメカニズムの1つの形式があります。
- UDTF インジェスチョン
すべてのワークロードがSnowflakeサーバー上で実行されます。SnowparkはJava UDTF を作成して並行してそれを呼び出し、Snowflakeの仮テーブルにデータを取り込みます。したがって、この機能には
udtf_configsパラメーターが必要です。
Snowpark Python JDBC には、インジェスチョンを並列化して高速化する方法も2つあります。
- パーティション列
このメソッドは、ユーザーが
jdbc()を呼び出すと、4つのパラメーターに基づいてソースデータを多数のパーティションに分割します。columnlower_boundupper_boundnum_partitions
これらの4つのパラメーターは同時に設定する必要があり、
columnは、数値または日付型である必要があります。- 述語
このメソッドは、パラメーター述語に基づいてソースデータをパーティションに分割します。パラメーター述語とは
WHERE句に含まれるのに適した式のリストで、各式はパーティションを定義します。述語は、より柔軟にパーティションを分割できます。たとえば、ブール値または非数値の列でパーティションを分割できます。
Snowpark Python JDBC は、パーティション内の並列レベルを調整することもできます。
- Fetch_size
パーティション内では、 API は :code:`fetch_size`で定義されたチャンクの行をフェッチします。これらの行は、フェッチされるときに並列にSnowflakeに書き込まれます。読み取りと書き込みにより重複が発生し、スループットが最大化されます。
JDBC を使用して外部データソースからデータを取り込む¶
JDBC を使用してSnowparkクライアントからデータを取り込む¶
SnowparkまたはSnowsightを使用して、 JDBC ドライバーjarファイルをSnowflakeステージにアップロードする
Snowparkを使用してアップロードします。
Snowparkで、セッションを作成した後、次のコードを実行します。
session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
次のステップで説明するように、Snowsightを使用してアップロードします。
Snowsightで、 Catalog -> Database Explorer をクリックします。
データベースの左側の検索バーで、 [your database name] -> [your schema name]-> stages-> [your stage name] をクリックします。
ステージページの右上にある “+File” ボタンをクリックします。
シークレット、ネットワークルール、外部アクセス統合を構成します。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
SnowparkクライアントからSnowpark JDBC を使用してターゲットからデータをプルします。
connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
JDBC を使用してストアドプロシージャからデータを取り込む¶
Snowsightを使用して、 JDBC ドライバーjarファイルをSnowflakeステージにアップロードする
Snowsightで、 Catalog -> Database Explorer をクリックします。
データベースの左側の検索バーで、 [your database name] -> [your schema name]-> stages-> [your stage name] をクリックします。
ステージページの右上にある “+File” ボタンをクリックします。
シークレット、ネットワークルール、外部アクセス統合を構成します。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
ストアドプロシージャからSnowpark JDBC を使用してターゲットからデータをプルします。
CREATE OR REPLACE PROCEDURE sp_jdbc() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.10' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'run' AS $$ import time def run(session): connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] ) df_table.write.save_as_table("snowflake_table", mode="overwrite") return f"success" $$ ; call sp_jdbc(); select * from snowflake_table ;
JDBC を使用してSnowflakeノートブックからデータを取り込む¶
Snowsightを使用して、 JDBC ドライバーjarファイルをSnowflakeステージにアップロードする
Snowsightで、 Catalog -> Database Explorer をクリックします。
データベースの左側の検索バーで、 [your database name] -> [your schema name]-> stages-> [your stage name] をクリックします。
ステージページの右上にある “+File” ボタンをクリックします。
シークレット、ネットワークルール、外部アクセス統合を構成します。
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
SnowflakeノートブックからSnowpark JDBC を使用してターゲットからデータをプルします。
import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
ソーストレース¶
Snowpark JDBC を使用して MySQL に接続する場合のソーストレース¶
Create接続関数にSnowparkのタグを含めます。
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%'; )
Snowpark JDBC を使用して SQL Serverに接続する場合のソーストレース¶
Create接続関数にSnowparkのタグを含めます。
connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Snowpark JDBC を使用して PostgresSQL に接続する場合のソーストレース¶
Create接続関数にSnowparkのタグを含めます。
connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
Snowpark JDBC を使用してOracleに接続する場合のソーストレース¶
Create接続関数にSnowparkのタグを含めます。
connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。
SELECT sid, serial#, username, program, module, action, client_identifier, client_info, osuser, machine FROM v$session WHERE program = 'snowflake-snowpark-python';
共通 DBMS および型のサポート¶
以下は、さまざまな DBMS システムのデータ型の認定リストです。ソースデータに他のデータ型が含まれている場合は、Snowpark Python JDBC はベストエフォートのSnowflakeデータ型にマップしようとするか、文字列にフォールバックしようとします。
Oracle¶
INTEGER
NUMBER
BINARY_FLOAT
BINARY_DOUBLE
VARCHAR2
CHAR
CLOB
NCHAR
NVARCHAR2
NCLOB
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
RAW
PostgresSQL¶
BIGINT
BIGSERIAL
BIT
BIT に VARYING
BOOLEAN
BOX
BYTEA
CHAR
VARCHAR
CIDR
CIRCLE
DATE
DOUBLE PRECISION
INET
INTEGER
INTERVAL
JSON
JSONB
LINE
LSEG
MACADDR
POINT
POLYGON
REAL
SMALLINT
SMALLSERIAL
SERIAL
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TSQUERY
TSVECTOR
TXID_SNAPSHOT
UUID
XML
MySQL¶
INT
DECIMAL
INT
TINYINT
SMALLINT
MEDIUMINT
BIGINT
YEAR
FLOAT
DOUBLE
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
DATE
DATETIME
TIMESTAMP
TIME
JSON
SQL サーバー¶
INT
BIGINT
INT
SMALLINT
TINYINT
BIT
DECIMAL
NUMERIC
MONEY
SMALLMONEY
FLOAT
REAL
DATE
TIME
DATETIME
DATETIME2
SMALLDATETIME
CHAR
VARCHAR
VARCHAR(MAX)
TEXT
NCHAR
NVARCHAR
NVARCHAR(MAX)
NTEXT
BINARY
VARBINARY
VARBINARY(MAX)
IMAGE
UNIQUEIDENTIFIER
TIMESTAMP
Databricks¶
Snowpark Python JDBC を使用したDatabricksへの接続は現在サポートされていません。