Usando o Snowpark Python JDBC

Com o Snowpark Python JDBC, os usuários do Snowpark Python podem extrair dados programaticamente de bancos de dados externos para o Snowflake. Isso permite que você se conecte a bancos de dados externos usando drivers JDBC.

Com essas APIs, você pode extrair dados perfeitamente para tabelas do Snowflake e transformá-los usando Snowpark DataFrames para análise avançada.

O JDBC pode ser usado de maneira semelhante à API Spark JDBC. A maioria dos parâmetros são projetados para serem idênticos ou semelhantes para melhor paridade. Para obter mais informações que comparam o Snowpark Python JDBC com a API Spark JDBC, consulte a tabela a seguir:

Parâmetros do Snowpark JDBC

Parâmetro

JDBC de Python do Snowpark

url

Uma cadeia de conexão usada para conectar à fonte de dados externa por meio do driver JDBC

udtf_configs

Um dicionário contendo as configurações necessárias para criação da UDTF

properties

Um dicionário contendo o par chave-valor necessário durante o estabelecimento da conexão com o JDBC

table

Tabela no banco de dados de origem

query

Consulta SQL encapsulada como uma subconsulta para leitura de dados

column

Coluna de particionamento para leituras paralelas

lower_bound

Limite inferior para particionamento

upper_bound

Limite superior para particionamento

num_partitions

Número de partições para paralelismo

query_timeout

A duração do tempo limite para execução de SQL, medida em segundos

fetch_size

Número de linhas buscadas por ida e volta

custom_schema

Esquema personalizado para extrair dados de bancos de dados externos

predicates

Lista de condições para partições de cláusula WHERE

session_init_statement

Executa uma instrução SQL ou PL/SQL na inicialização da sessão

Compreensão de paralelismo

Atualmente, o Snowpark Python JDBC tem uma forma de mecanismo de ingestão subjacente:

Ingestão UDTF

Todas as cargas de trabalho são executadas no servidor Snowflake. O Snowpark cria uma UDTF Java e a invoca em paralelo para ingerir dados em uma tabela temporária do Snowflake. Assim, o parâmetro udtf_configs é necessário para esse recurso.

O Snowpark Python JDBC tem duas maneiras de paralelizar e acelerar a ingestão:

Coluna de partição

Este método divide os dados de origem em uma série de partições com base em quatro parâmetros, quando os usuários chamam jdbc():

  • column

  • lower_bound

  • upper_bound

  • num_partitions

Esses quatro parâmetros precisam ser definidos ao mesmo tempo, e column deve ser um tipo de data ou numérico.

Predicados

Esse método divide os dados de origem em partições baseadas em predicados de parâmetros, que são uma lista de expressões adequadas para inclusão em cláusulas WHERE, em que cada expressão define uma partição. Os predicados oferecem um meio mais flexível de dividir as partições; por exemplo, você pode dividi-las em colunas boolianas ou não numéricas.

O Snowpark Python JDBC também permite ajustar o nível de paralelismo em uma partição:

Fetch_size

Dentro de uma partição, a API obtém linhas em partes definidas por fetch_size. Essas linhas são gravadas no Snowflake em paralelo à medida que são buscadas, o que permite que a leitura e a escrita se sobreponham e maximizem o rendimento.

Usando o JDBC para ingerir dados de uma fonte de dados externa

Usando o JDBC para ingerir dados de um cliente Snowpark

  1. Carregue o arquivo jar do driver JDBC em uma área de preparação do Snowflake usando o Snowpark ou o Snowsight.

    • Carregue usando o Snowpark.

      No Snowpark, após criar uma sessão, execute o seguinte código:

      session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
      
      Copy
    • Carregue usando o Snowsight conforme descrito nas etapas a seguir.

      1. No Snowsight, clique em Catalog -> Database Explorer.

      2. Na barra de pesquisa de bancos de dados à esquerda, clique em [your database name] -> [your schema name] -> stages -> [your stage name].

      3. Clique no botão “+File” no canto superior direito da página da área de preparação.

  2. Configure o segredo, a regra de rede e a integração de acesso externo.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Extraia os dados do destino usando o Snowpark JDBC de um cliente Snowpark.

    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

Usando o JDBC para ingerir dados de um procedimento armazenado

  1. Carregue o arquivo jar do driver JDBC na área de preparação do Snowflake usando o Snowsight.

    • No Snowsight, clique em Catalog -> Database Explorer.

    • Na barra de pesquisa de bancos de dados à esquerda, clique em [your database name] -> [your schema name] -> stages -> [your stage name].

    • Clique no botão “+File” no canto superior direito da página da área de preparação.

  2. Configure o segredo, a regra de rede e a integração de acesso externo.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Extraia os dados do destino usando o Snowpark JDBC de um procedimento armazenado.

    CREATE OR REPLACE PROCEDURE sp_jdbc()
    RETURNS STRING
    LANGUAGE PYTHON
    RUNTIME_VERSION = '3.10'
    PACKAGES = ('snowflake-snowpark-python')
    HANDLER = 'run'
    AS
    $$
    import time
    def run(session):
        connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
        udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
        # Call jdbc to pull data from target table
        df_table = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
            )
    
        # Call jdbc to pull data from target query
        df_query = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                query="select * from <your table>",
            )
    
        # Pull data from target table with parallelism using partition column
        df_table_partition_column = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                num_partitions=4,
                column="ID",
                upper_bound=10000,
                lower_bound=0
            )
    
        # Pull data from target table with parallelism using predicates
        df_table_predicates = session.read.jdbc(
                url=connection_str,
                udtf_configs=udtf_configs,
                table="<your table>",
                fetch_size=100000,
                predicates = [
                    "ID < 3",
                    "ID >= 3"
                ]
            )
        df_table.write.save_as_table("snowflake_table", mode="overwrite")
        return f"success"
    
    $$
    ;
    
    call sp_jdbc();
    select * from snowflake_table ;
    
    Copy

Usando o JDBC para ingerir dados de um notebook Snowflake

  1. Carregue o arquivo jar do driver JDBC na área de preparação do Snowflake usando o Snowsight.

    • No Snowsight, clique em Catalog -> Database Explorer.

    • Na barra de pesquisa de bancos de dados à esquerda, clique em [your database name] -> [your schema name] -> stages -> [your stage name].

    • Clique no botão “+File” no canto superior direito da página da área de preparação.

  2. Configure o segredo, a regra de rede e a integração de acesso externo.

    -- Configure a secret to allow egress to the source endpoint
    CREATE OR REPLACE SECRET <your secret>
        TYPE = PASSWORD
        USERNAME = '<your username>'
        PASSWORD = '<your password>';
    
    -- Configure a network rule to allow egress to the source endpoint
    CREATE OR REPLACE NETWORK RULE <your network rule>
      TYPE = HOST_PORT
      MODE = EGRESS
      VALUE_LIST = ('<your host>:<your port>');
    
    -- Configure an external access integration
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration>
      ALLOWED_NETWORK_RULES = (<your network rule>)
      ALLOWED_AUTHENTICATION_SECRETS = (<your secret>)
      ENABLED = true;
    
    Copy
  3. Extraia os dados do destino usando o Snowpark JDBC de um notebook Snowflake.

    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>"
    udtf_configs = {
            "external_access_integration": "<your integration>",
            "secret": "<your secret>",
            "imports": ["<your stage path to jdbc jar file>"]
        }
    
    # Call jdbc to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    # Call jdbc to pull data from target query
    df_query = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            query="select * from <your table>",
        )
    
    # Pull data from target table with parallelism using partition column
    df_table_partition_column = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            num_partitions=4,
            column="ID",
            upper_bound=10000,
            lower_bound=0
        )
    
    # Pull data from target table with parallelism using predicates
    df_table_predicates = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
            fetch_size=100000,
            predicates = [
                "ID < 3",
                "ID >= 3"
            ]
        )
    
    Copy

Rastreamento da fonte

Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao MySQL

  1. Inclua uma tag do Snowpark na função de criação da conexão:

    connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
        "external_access_integration": "<your integration>",
        "secret": "<your secret>",
        "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:

    SELECT *
    FROM performance_schema.events_statements_history_long
    WHERE THREAD_ID = (
      SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%';
    )
    
    Copy

Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao SQL Server

  1. Inclua uma tag do Snowpark na função de criação da conexão:

    connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
          url=connection_str,
          udtf_configs=udtf_configs,
          table="<your table>",
      )
    
    Copy
  2. Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:

    SELECT
      s.session_id,
      s.program_name,
      r.status,
      t.text AS sql_text
    FROM sys.dm_exec_sessions s
    JOIN sys.dm_exec_requests r ON s.session_id = r.session_id
    CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t
    WHERE s.program_name = 'snowflake-snowpark-python';
    
    Copy

Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao PostgresSQL

  1. Inclua uma tag do Snowpark na função de criação da conexão:

    connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:

    SELECT
      pid,
      usename AS username,
      datname AS database,
      application_name,
      client_addr,
      state,
      query_start,
      query
    FROM
      pg_stat_activity
    WHERE
      application_name = 'snowflake-snowpark-python';
    
    Copy

Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao Oracle

  1. Inclua uma tag do Snowpark na função de criação da conexão:

    connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python"
    udtf_configs = {
    "external_access_integration": "<your integration>",
    "secret": "<your secret>",
    "imports": ["<your stage path to jdbc jar file>"]
    }
    # Call dbapi to pull data from target table
    df_table = session.read.jdbc(
            url=connection_str,
            udtf_configs=udtf_configs,
            table="<your table>",
        )
    
    Copy
  2. Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:

    SELECT
      sid,
      serial#,
      username,
      program,
      module,
      action,
      client_identifier,
      client_info,
      osuser,
      machine
    FROM v$session
    WHERE program = 'snowflake-snowpark-python';
    
    Copy

Suporte a DBMS comum e aos tipos

Veja a seguir uma lista certificada de tipos de dados de diferentes sistemas DBMS. Se os seus dados de origem envolvem outros tipos de dados, o Snowpark Python JDBC tenta mapeá-los para os tipos de dados do Snowflake da melhor maneira possível ou retornar para cadeias de caracteres.

Oracle

  • INTEGER

  • NUMBER

  • BINARY_FLOAT

  • BINARY_DOUBLE

  • VARCHAR2

  • CHAR

  • CLOB

  • NCHAR

  • NVARCHAR2

  • NCLOB

  • DATE

  • TIMESTAMP

  • TIMESTAMP WITH TIME ZONE

  • TIMESTAMP WITH LOCAL TIME ZONE

  • RAW

PostgresSQL

  • BIGINT

  • BIGSERIAL

  • BIT

  • BIT VARYING

  • BOOLEAN

  • BOX

  • BYTEA

  • CHAR

  • VARCHAR

  • CIDR

  • CIRCLE

  • DATE

  • DOUBLE PRECISION

  • INET

  • INTEGER

  • INTERVAL

  • JSON

  • JSONB

  • LINE

  • LSEG

  • MACADDR

  • POINT

  • POLYGON

  • REAL

  • SMALLINT

  • SMALLSERIAL

  • SERIAL

  • TEXT

  • TIME

  • TIMESTAMP

  • TIMESTAMPTZ

  • TSQUERY

  • TSVECTOR

  • TXID_SNAPSHOT

  • UUID

  • XML

MySQL

  • INT

  • DECIMAL

  • INT

  • TINYINT

  • SMALLINT

  • MEDIUMINT

  • BIGINT

  • YEAR

  • FLOAT

  • DOUBLE

  • CHAR

  • VARCHAR

  • TINYTEXT

  • TEXT

  • MEDIUMTEXT

  • LONGTEXT

  • ENUM

  • SET

  • BIT

  • BINARY

  • VARBINARY

  • TINYBLOB

  • BLOB

  • MEDIUMBLOB

  • LONGBLOB

  • DATE

  • DATETIME

  • TIMESTAMP

  • TIME

  • JSON

SQL Server

  • INT

  • BIGINT

  • INT

  • SMALLINT

  • TINYINT

  • BIT

  • DECIMAL

  • NUMERIC

  • MONEY

  • SMALLMONEY

  • FLOAT

  • REAL

  • DATE

  • TIME

  • DATETIME

  • DATETIME2

  • SMALLDATETIME

  • CHAR

  • VARCHAR

  • VARCHAR(MAX)

  • TEXT

  • NCHAR

  • NVARCHAR

  • NVARCHAR(MAX)

  • NTEXT

  • BINARY

  • VARBINARY

  • VARBINARY(MAX)

  • IMAGE

  • UNIQUEIDENTIFIER

  • TIMESTAMP

Databricks

Não há suporte para conexão com o Databricks usando o Snowpark Python JDBC no momento.