Usando o Snowpark Python JDBC¶
Com o Snowpark Python JDBC, os usuários do Snowpark Python podem extrair dados programaticamente de bancos de dados externos para o Snowflake. Isso permite que você se conecte a bancos de dados externos usando drivers JDBC.
Com essas APIs, você pode extrair dados perfeitamente para tabelas do Snowflake e transformá-los usando Snowpark DataFrames para análise avançada.
O JDBC pode ser usado de maneira semelhante à API Spark JDBC. A maioria dos parâmetros são projetados para serem idênticos ou semelhantes para melhor paridade. Para obter mais informações que comparam o Snowpark Python JDBC com a API Spark JDBC, consulte a tabela a seguir:
Parâmetros do Snowpark JDBC¶
Parâmetro |
JDBC de Python do Snowpark |
|---|---|
|
Uma cadeia de conexão usada para conectar à fonte de dados externa por meio do driver JDBC |
|
Um dicionário contendo as configurações necessárias para criação da UDTF |
|
Um dicionário contendo o par chave-valor necessário durante o estabelecimento da conexão com o JDBC |
|
Tabela no banco de dados de origem |
|
Consulta SQL encapsulada como uma subconsulta para leitura de dados |
|
Coluna de particionamento para leituras paralelas |
|
Limite inferior para particionamento |
|
Limite superior para particionamento |
|
Número de partições para paralelismo |
|
A duração do tempo limite para execução de SQL, medida em segundos |
|
Número de linhas buscadas por ida e volta |
|
Esquema personalizado para extrair dados de bancos de dados externos |
|
Lista de condições para partições de cláusula WHERE |
|
Executa uma instrução SQL ou PL/SQL na inicialização da sessão |
Compreensão de paralelismo¶
Atualmente, o Snowpark Python JDBC tem uma forma de mecanismo de ingestão subjacente:
- Ingestão UDTF
Todas as cargas de trabalho são executadas no servidor Snowflake. O Snowpark cria uma UDTF Java e a invoca em paralelo para ingerir dados em uma tabela temporária do Snowflake. Assim, o parâmetro
udtf_configsé necessário para esse recurso.
O Snowpark Python JDBC tem duas maneiras de paralelizar e acelerar a ingestão:
- Coluna de partição
Este método divide os dados de origem em uma série de partições com base em quatro parâmetros, quando os usuários chamam
jdbc():columnlower_boundupper_boundnum_partitions
Esses quatro parâmetros precisam ser definidos ao mesmo tempo, e
columndeve ser um tipo de data ou numérico.- Predicados
Esse método divide os dados de origem em partições baseadas em predicados de parâmetros, que são uma lista de expressões adequadas para inclusão em cláusulas
WHERE, em que cada expressão define uma partição. Os predicados oferecem um meio mais flexível de dividir as partições; por exemplo, você pode dividi-las em colunas boolianas ou não numéricas.
O Snowpark Python JDBC também permite ajustar o nível de paralelismo em uma partição:
- Fetch_size
Dentro de uma partição, a API obtém linhas em partes definidas por
fetch_size. Essas linhas são gravadas no Snowflake em paralelo à medida que são buscadas, o que permite que a leitura e a escrita se sobreponham e maximizem o rendimento.
Usando o JDBC para ingerir dados de uma fonte de dados externa¶
Usando o JDBC para ingerir dados de um cliente Snowpark¶
Carregue o arquivo jar do driver JDBC em uma área de preparação do Snowflake usando o Snowpark ou o Snowsight.
Carregue usando o Snowpark.
No Snowpark, após criar uma sessão, execute o seguinte código:
session.file.put("<your directory>/<your file name>", "@<your stage name>/<stage path>")
Carregue usando o Snowsight conforme descrito nas etapas a seguir.
No Snowsight, clique em Catalog -> Database Explorer.
Na barra de pesquisa de bancos de dados à esquerda, clique em [your database name] -> [your schema name] -> stages -> [your stage name].
Clique no botão “+File” no canto superior direito da página da área de preparação.
Configure o segredo, a regra de rede e a integração de acesso externo.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Extraia os dados do destino usando o Snowpark JDBC de um cliente Snowpark.
connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
Usando o JDBC para ingerir dados de um procedimento armazenado¶
Carregue o arquivo jar do driver JDBC na área de preparação do Snowflake usando o Snowsight.
No Snowsight, clique em Catalog -> Database Explorer.
Na barra de pesquisa de bancos de dados à esquerda, clique em [your database name] -> [your schema name] -> stages -> [your stage name].
Clique no botão “+File” no canto superior direito da página da área de preparação.
Configure o segredo, a regra de rede e a integração de acesso externo.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Extraia os dados do destino usando o Snowpark JDBC de um procedimento armazenado.
CREATE OR REPLACE PROCEDURE sp_jdbc() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = '3.10' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'run' AS $$ import time def run(session): connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] ) df_table.write.save_as_table("snowflake_table", mode="overwrite") return f"success" $$ ; call sp_jdbc(); select * from snowflake_table ;
Usando o JDBC para ingerir dados de um notebook Snowflake¶
Carregue o arquivo jar do driver JDBC na área de preparação do Snowflake usando o Snowsight.
No Snowsight, clique em Catalog -> Database Explorer.
Na barra de pesquisa de bancos de dados à esquerda, clique em [your database name] -> [your schema name] -> stages -> [your stage name].
Clique no botão “+File” no canto superior direito da página da área de preparação.
Configure o segredo, a regra de rede e a integração de acesso externo.
-- Configure a secret to allow egress to the source endpoint CREATE OR REPLACE SECRET <your secret> TYPE = PASSWORD USERNAME = '<your username>' PASSWORD = '<your password>'; -- Configure a network rule to allow egress to the source endpoint CREATE OR REPLACE NETWORK RULE <your network rule> TYPE = HOST_PORT MODE = EGRESS VALUE_LIST = ('<your host>:<your port>'); -- Configure an external access integration CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION <your integration> ALLOWED_NETWORK_RULES = (<your network rule>) ALLOWED_AUTHENTICATION_SECRETS = (<your secret>) ENABLED = true;
Extraia os dados do destino usando o Snowpark JDBC de um notebook Snowflake.
import snowflake.snowpark.context session = snowflake.snowpark.context.get_active_session() connection_str=f"jdbc:<your dbms>://<your host>:<your port>/<your db>" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call jdbc to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", ) # Call jdbc to pull data from target query df_query = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, query="select * from <your table>", ) # Pull data from target table with parallelism using partition column df_table_partition_column = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, num_partitions=4, column="ID", upper_bound=10000, lower_bound=0 ) # Pull data from target table with parallelism using predicates df_table_predicates = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", fetch_size=100000, predicates = [ "ID < 3", "ID >= 3" ] )
Rastreamento da fonte¶
Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao MySQL¶
Inclua uma tag do Snowpark na função de criação da conexão:
connection_str="jdbc:mysql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT * FROM performance_schema.events_statements_history_long WHERE THREAD_ID = ( SELECT THREAD_ID, NAME FROM performance_schema.threads WHERE NAME LIKE '%snowflake-snowpark-python%'; )
Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao SQL Server¶
Inclua uma tag do Snowpark na função de criação da conexão:
connection_str="jdbc:mssql://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT s.session_id, s.program_name, r.status, t.text AS sql_text FROM sys.dm_exec_sessions s JOIN sys.dm_exec_requests r ON s.session_id = r.session_id CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) AS t WHERE s.program_name = 'snowflake-snowpark-python';
Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao PostgresSQL¶
Inclua uma tag do Snowpark na função de criação da conexão:
connection_str="jdbc:postgres://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT pid, usename AS username, datname AS database, application_name, client_addr, state, query_start, query FROM pg_stat_activity WHERE application_name = 'snowflake-snowpark-python';
Rastreamento da fonte ao usar o Snowpark JDBC para se conectar ao Oracle¶
Inclua uma tag do Snowpark na função de criação da conexão:
connection_str="jdbc:oracle://<your host>:<your port>/<your db>?applicationName=snowflake-snowpark-python" udtf_configs = { "external_access_integration": "<your integration>", "secret": "<your secret>", "imports": ["<your stage path to jdbc jar file>"] } # Call dbapi to pull data from target table df_table = session.read.jdbc( url=connection_str, udtf_configs=udtf_configs, table="<your table>", )
Execute o seguinte SQL na fonte de dados para capturar consultas do Snowpark que ainda estão ativas:
SELECT sid, serial#, username, program, module, action, client_identifier, client_info, osuser, machine FROM v$session WHERE program = 'snowflake-snowpark-python';
Suporte a DBMS comum e aos tipos¶
Veja a seguir uma lista certificada de tipos de dados de diferentes sistemas DBMS. Se os seus dados de origem envolvem outros tipos de dados, o Snowpark Python JDBC tenta mapeá-los para os tipos de dados do Snowflake da melhor maneira possível ou retornar para cadeias de caracteres.
Oracle¶
INTEGER
NUMBER
BINARY_FLOAT
BINARY_DOUBLE
VARCHAR2
CHAR
CLOB
NCHAR
NVARCHAR2
NCLOB
DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONE
RAW
PostgresSQL¶
BIGINT
BIGSERIAL
BIT
BIT VARYING
BOOLEAN
BOX
BYTEA
CHAR
VARCHAR
CIDR
CIRCLE
DATE
DOUBLE PRECISION
INET
INTEGER
INTERVAL
JSON
JSONB
LINE
LSEG
MACADDR
POINT
POLYGON
REAL
SMALLINT
SMALLSERIAL
SERIAL
TEXT
TIME
TIMESTAMP
TIMESTAMPTZ
TSQUERY
TSVECTOR
TXID_SNAPSHOT
UUID
XML
MySQL¶
INT
DECIMAL
INT
TINYINT
SMALLINT
MEDIUMINT
BIGINT
YEAR
FLOAT
DOUBLE
CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SET
BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
DATE
DATETIME
TIMESTAMP
TIME
JSON
SQL Server¶
INT
BIGINT
INT
SMALLINT
TINYINT
BIT
DECIMAL
NUMERIC
MONEY
SMALLMONEY
FLOAT
REAL
DATE
TIME
DATETIME
DATETIME2
SMALLDATETIME
CHAR
VARCHAR
VARCHAR(MAX)
TEXT
NCHAR
NVARCHAR
NVARCHAR(MAX)
NTEXT
BINARY
VARBINARY
VARBINARY(MAX)
IMAGE
UNIQUEIDENTIFIER
TIMESTAMP
Databricks¶
Não há suporte para conexão com o Databricks usando o Snowpark Python JDBC no momento.