Snowpark Python JDBC の使用

Snowpark Python JDBC を使用すると、Snowpark Pythonユーザーは、外部データベースからSnowflakeにデータをプログラムでプルできます。これにより、 JDBC ドライバーを使用して外部データベースに接続することができます。

これらの APIsのデータをシームレスに Snowpark DataFrames テーブルに取り込み、Snowflakeを使用して高度な分析のための変換することができます。

`JDBC-<https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.DataFrameReader.jdbc>`_ は、Spark JDBC API と同様の方法で使用できます。ほとんどのパラメーターは、より安全性を高めるために、同一または類似するように設計されています。Snowpark Python JDBC とSpark JDBC API の比較については、以下の表を参照してください。

Snowpark JDBC パラメーター

パラメーター

Snowpark Python JDBC

url

JDBC ドライバーを介して外部データソースに接続するために使用される接続文字列。

udtf_configs

UDTF 作成に必要な構成を含むディクショナリ。

properties

JDBC 接続確立中に必要なキーと値のペアを含むディクショナリ

table

ソースデータベースのテーブル

query

SQL クエリは、データを読み取るためのサブクエリとしてラップされました

column

並列読み取りのパーティション列

lower_bound

パーティション分割の下限

upper_bound

パーティション分割の上限

num_partitions

並列処理のパーティションの数

query_timeout

SQL 実行のタイムアウト期間、秒単位で測定。

fetch_size

ラウンドトリップごとに取得された行数

custom_schema

外部データベースからデータをプルするためのカスタムスキーマ

predicates

WHERE 句のパーティションの条件リスト

session_init_statement

セッション初期化時の SQL または PL/SQL ステートメントを実行します

並列処理の理解

Snowpark Python JDBC には現在、基礎となるインジェスチョンメカニズムの1つの形式があります。

UDTF インジェスチョン

すべてのワークロードがSnowflakeサーバー上で実行されます。SnowparkはJava UDTF を作成して並行してそれを呼び出し、Snowflakeの仮テーブルにデータを取り込みます。したがって、この機能には udtf_configs パラメーターが必要です。

Snowpark Python JDBC には、インジェスチョンを並列化して高速化する方法も2つあります。

パーティション列

このメソッドは、ユーザーが jdbc() を呼び出すと、4つのパラメーターに基づいてソースデータを多数のパーティションに分割します。

  • column

  • lower_bound

  • upper_bound

  • num_partitions

これらの4つのパラメーターは同時に設定する必要があり、 column は、数値または日付型である必要があります。

述語

このメソッドは、パラメーター述語に基づいてソースデータをパーティションに分割します。パラメーター述語とは WHERE 句に含まれるのに適した式のリストで、各式はパーティションを定義します。述語は、より柔軟にパーティションを分割できます。たとえば、ブール値または非数値の列でパーティションを分割できます。

Snowpark Python JDBC は、パーティション内の並列レベルを調整することもできます。

Fetch_size

パーティション内では、 API は :code:`fetch_size`で定義されたチャンクの行をフェッチします。これらの行は、フェッチされるときに並列にSnowflakeに書き込まれます。読み取りと書き込みにより重複が発生し、スループットが最大化されます。

JDBC を使用して外部データソースからデータを取り込む

JDBC を使用してSnowparkクライアントからデータを取り込む

  1. SnowparkまたはSnowsightを使用して、 JDBC ドライバーjarファイルをSnowflakeステージにアップロードする

    • Snowparkを使用してアップロードします。

      Snowparkで、セッションを作成した後、次のコードを実行します。

      session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
      
      Copy
    • 次のステップで説明するように、Snowsightを使用してアップロードします。

      1. Snowsightで、 Catalog -> Database Explorer をクリックします。

      2. データベースの左側の検索バーで、 [your database name] -> [your schema name]-> stages-> [your stage name] をクリックします。

      3. ステージページの右上にある “+File” ボタンをクリックします。

  2. シークレット、ネットワークルール、外部アクセス統合を構成します。

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. SnowparkクライアントからSnowpark JDBC を使用してターゲットからデータをプルします。

    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

JDBC を使用してストアドプロシージャからデータを取り込む

  1. Snowsightを使用して、 JDBC ドライバーjarファイルをSnowflakeステージにアップロードする

    • Snowsightで、 Catalog -> Database Explorer をクリックします。

    • データベースの左側の検索バーで、 [your database name] -> [your schema name]-> stages-> [your stage name] をクリックします。

    • ステージページの右上にある “+File” ボタンをクリックします。

  2. シークレット、ネットワークルール、外部アクセス統合を構成します。

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. ストアドプロシージャからSnowpark JDBC を使用してターゲットからデータをプルします。

    CREATE OR REPLACE PROCEDURE sp_jdbc()
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python')
    HANDLER = 'run'
    AS
    $$
    import time
    def run(session):
        connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
        udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
        # Call jdbc to pull data from target table
        df_table = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
            )
    
        # Call jdbc to pull data from target query
        df_query = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                query="select * from <your table>",
            )
    
        # Pull data from target table with parallelism using partition column
        df_table_partition_column = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                num_partitions=4,
                column="ID",
                upper_bound=10000,
                lower_bound=0
            )
    
        # Pull data from target table with parallelism using predicates
        df_table_predicates = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                predicates = [
                    "ID < 3",
                    "ID >= 3"
                ]
            )
        df_table.write.save_as_table("snowflake_table", mode="overwrite")
        return f"success"
    
    $$
    ;
    
    call sp_jdbc();
    select * from snowflake_table ;
    
    Copy

JDBC を使用してSnowflakeノートブックからデータを取り込む

  1. Snowsightを使用して、 JDBC ドライバーjarファイルをSnowflakeステージにアップロードする

    • Snowsightで、 Catalog -> Database Explorer をクリックします。

    • データベースの左側の検索バーで、 [your database name] -> [your schema name]-> stages-> [your stage name] をクリックします。

    • ステージページの右上にある “+File” ボタンをクリックします。

  2. シークレット、ネットワークルール、外部アクセス統合を構成します。

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. SnowflakeノートブックからSnowpark JDBC を使用してターゲットからデータをプルします。

    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

ソーストレース

Snowpark JDBC を使用して MySQL に接続する場合のソーストレース

  1. Create接続関数にSnowparkのタグを含めます。

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
      SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%';
    )
    
    Copy

Snowpark JDBC を使用して SQL Serverに接続する場合のソーストレース

  1. Create接続関数にSnowparkのタグを含めます。

    connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
          url=connection_str,
          udtf_configs=udtf_configs,
          table="<your table>",
      )
    
    Copy
  2. データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。

    SELECT
      s.session_id,
      s.program_name,
      r.status,
      t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Snowpark JDBC を使用して PostgresSQL に接続する場合のソーストレース

  1. Create接続関数にSnowparkのタグを含めます。

    connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。

    SELECT
      pid,
      usename AS username,
      datname AS database,
      application_name,
      client_addr,
      state,
      query_start,
      query
    FROM
      pg_stat_activity
    WHERE
      application_name = 'snowflake-snowpark-python';
    
    Copy

Snowpark JDBC を使用してOracleに接続する場合のソーストレース

  1. Create接続関数にSnowparkのタグを含めます。

    connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. データソースで以下の SQL を実行し、まだ存続しているSnowparkからのクエリをキャプチャします。

    SELECT
      sid,
      serial#,
      username,
      program,
      module,
      action,
      client_identifier,
      client_info,
      osuser,
      machine
    FROM v$session
    WHERE program = 'snowflake-snowpark-python';
    
    Copy

共通 DBMS および型のサポート

以下は、さまざまな DBMS システムのデータ型の認定リストです。ソースデータに他のデータ型が含まれている場合は、Snowpark Python JDBC はベストエフォートのSnowflakeデータ型にマップしようとするか、文字列にフォールバックしようとします。

Oracle

  • INTEGER

  • NUMBER

  • BINARY_FLOAT

  • BINARY_DOUBLE

  • VARCHAR2

  • CHAR

  • CLOB

  • NCHAR

  • NVARCHAR2

  • NCLOB

  • DATE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • RAW

PostgresSQL

  • BIGINT

  • BIGSERIAL

  • BIT

  • BIT に VARYING

  • BOOLEAN

  • BOX

  • BYTEA

  • CHAR

  • VARCHAR

  • CIDR

  • CIRCLE

  • DATE

  • DOUBLE PRECISION

  • INET

  • INTEGER

  • INTERVAL

  • JSON

  • JSONB

  • LINE

  • LSEG

  • MACADDR

  • POINT

  • POLYGON

  • REAL

  • SMALLINT

  • SMALLSERIAL

  • SERIAL

  • TEXT

  • TIME

  • TIMESTAMP

  • TIMESTAMPTZ

  • TSQUERY

  • TSVECTOR

  • TXID_SNAPSHOT

  • UUID

  • XML

MySQL

  • INT

  • DECIMAL

  • INT

  • TINYINT

  • SMALLINT

  • MEDIUMINT

  • BIGINT

  • YEAR

  • FLOAT

  • DOUBLE

  • CHAR

  • VARCHAR

  • TINYTEXT

  • TEXT

  • MEDIUMTEXT

  • LONGTEXT

  • ENUM

  • SET

  • BIT

  • BINARY

  • VARBINARY

  • TINYBLOB

  • BLOB

  • MEDIUMBLOB

  • LONGBLOB

  • DATE

  • DATETIME

  • TIMESTAMP

  • TIME

  • JSON

SQL サーバー

  • INT

  • BIGINT

  • INT

  • SMALLINT

  • TINYINT

  • BIT

  • DECIMAL

  • NUMERIC

  • MONEY

  • SMALLMONEY

  • FLOAT

  • REAL

  • DATE

  • TIME

  • DATETIME

  • DATETIME2

  • SMALLDATETIME

  • CHAR

  • VARCHAR

  • VARCHAR(MAX)

  • TEXT

  • NCHAR

  • NVARCHAR

  • NVARCHAR(MAX)

  • NTEXT

  • BINARY

  • VARBINARY

  • VARBINARY(MAX)

  • IMAGE

  • UNIQUEIDENTIFIER

  • TIMESTAMP

Databricks

Snowpark Python JDBC を使用したDatabricksへの接続は現在サポートされていません。