Leitura de dados de fontes de dados externas usando Snowpark Python DB-API

Com o Snowpark Python DB-API, os usuários do Snowpark Python podem extrair dados programaticamente de bancos de dados externos para o Snowflake. Isso inclui:

  • Suporte do Python DB-API: conexão a bancos de dados externos usando drivers padrão do Python DB-API 2.0.

  • Configuração simplificada: use pip para instalar os drivers necessários, sem necessidade de gerenciar dependências adicionais.

Com essas APIs, você pode extrair dados perfeitamente para tabelas do Snowflake e transformá-los usando Snowpark DataFrames para análise avançada.

Uso do Snowpark Python DB-API

O DB-API pode ser usado de maneira semelhante ao Spark JDBC API. A maioria dos parâmetros são projetados para serem idênticos ou semelhantes para melhor paridade. Ao mesmo tempo, o Snowpark enfatiza um design de Python primeiro com convenções de nomenclatura intuitivas, evitando configurações específicas de JDBC. Isso fornece aos desenvolvedores Python uma experiência familiar. Para obter mais informações que comparam Snowpark Python DB-API com o Spark JDBC API, consulte Parâmetros de DB-API.

Parâmetros de DB-API

Parâmetro

Snowpark DB-API

create_connection

Função para criar uma conexão do Python DB-API.

table

Especifica a tabela no banco de dados de origem.

query

Consulta SQL agrupada como uma subconsulta para leitura de dados.

column

Coluna de particionamento para leituras paralelas.

lower_bound

Limite inferior para particionamento.

upper_bound

Limite superior para particionamento.

num_partitions

Número de partições para paralelismo.

query_timeout

Tempo limite para execução de SQL (em segundos).

fetch_size

Número de linhas buscadas por ida e volta.

custom_schema

Esquema personalizado para extrair dados de bancos de dados externos.

max_workers

Número de trabalhadores para busca paralela e extração de dados de bancos de dados externos.

predicates

Lista de condições para partições de cláusula WHERE.

session_init_statement

Executa uma instrução SQL ou PL/SQL na inicialização da sessão.

udtf_configs

Executar a carga de trabalho usando um Snowflake UDTF para melhor desempenho.

fetch_merge_count

Número de lotes buscados a serem mesclados em um único arquivo Parquet antes de carregá-lo.

Compreensão de paralelismo

O Python do Snowpark DB-API usa duas formas independentes de paralelismo com base na entrada do usuário:

  • Paralelismo baseado em partições

    Quando os usuários especificam informações de particionamento (por exemplo, column, lower_bound, upper_bound, num_partitions) ou predicados, o Snowflake divide a consulta em várias partições. Elas são processadas em paralelo usando multiprocessamento, com cada trabalhador buscando e escrevendo sua própria partição.

  • Paralelismo baseado em tamanho de busca dentro de cada partição

    Dentro de uma partição, a API obtém linhas em partes definidas por fetch_size. Essas linhas são gravadas no Snowflake em paralelo à medida que são buscadas, permitindo que a leitura e a escrita se sobreponham e maximizem o rendimento.

Essas duas formas de paralelismo operam de forma independente. Se nem a partição nem fetch_size for especificado, a função carregará toda a tabela de origem na memória antes de gravar no Snowflake. Isso pode aumentar o uso de memória e reduzir o desempenho para grandes conjuntos de dados.

SQL Server

Uso de DB-API para conectar-se ao SQL Server de um cliente Snowpark

Para conectar-se ao SQL Server do Snowpark, você precisará dos três pacotes a seguir:

Abaixo estão os exemplos de código necessários para conectar ao SQL Server do cliente Snowpark e um procedimento armazenado.

  • Instalação do driver SQL do Python

/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
  brew tap microsoft/mssql-release https://github.com/Microsoft/homebrew-mssql-release
  brew update
  HOMEBREW_ACCEPT_EULA=Y brew install msodbcsql18 mssql-tools18
Copy
  • Instalação de snowflake-snowpark-python[pandas] e pyodbc

pip install snowflake-snowpark-python[pandas]
pip install pyodbc
Copy
  • Definição do método de fábrica para criar conexão com o SQL Server

def create_sql_server_connection():
    import pyodbc
    HOST = "mssql_host"
    PORT = "mssql_port"
    USERNAME = "mssql_username"
    PASSWORD = "mssql_password"
    DATABASE = "mssql_db"
    connection_str = (
        "DRIVER={{ODBC Driver 18 for SQL Server}};"
        "SERVER={HOST},{PORT};"
        "DATABASE={DATABASE};"
        "UID={USERNAME};"
        "PWD={PASSWORD};"
    )
    connection = pyodbc.connect(connection_str)
    return connection


# Call dbapi to pull data from mssql_table

df = session.read.dbapi(
  create_sql_server_connection,
    table="mssql_table")
Copy

Uso de DB-API para conectar-se ao SQL Server de um procedimento armazenado

  • Configure a integração de acesso externo, necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

    CREATE OR REPLACE SECRET mssql_secret
      TYPE = PASSWORD
      USERNAME = 'mssql_username'
      PASSWORD = 'mssql_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE mssql_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('mssql_host:mssql_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mssql_access_integration
      ALLOWED_NETWORK_RULES = (mssql_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (mssql_secret)
      ENABLED = true;
    
    -- Create or replace a Python stored procedure
    
    CREATE OR REPLACE PROCEDURE sp_mssql_dbapi()
      RETURNS TABLE()
      LANGUAGE PYTHON
      RUNTIME_VERSION='3.11'
      HANDLER='run'
      PACKAGES=('snowflake-snowpark-python', 'pyodbc', 'msodbcsql')
            EXTERNAL_ACCESS_INTEGRATIONS = (mssql_access_integration)
            SECRETS = ('cred' = mssql_secret )
    
    AS $$
    
    # Get user name and password from mssql_secret
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    # Define a method to connect to SQL server_hostname
    from snowflake.snowpark import Session
    def create_sql_server_connection():
        import pyodbc
    
        host = "mssql_host"
        port = mssql_port
        username = USER
        password = PASSWORD
        database = "mssql_database"
        connection_str = (
          "DRIVER={{ODBC Driver 18 for SQL Server}};"
          "SERVER={host},{port};"
          "DATABASE={database};"
          "UID={username};"
          "PWD={password};"
        )
    
        connection = pyodbc.connect(connection_str)
        return connection
    
    def run(session: Session):
        df = session.read.dbapi(
            create_sql_server_connection,
            table="mssql_table"
        )
        return df
    $$;
    
    CALL sp_mssql_dbapi();
    
    Copy

Oracle

Para conectar-se ao Oracle a partir do Snowpark, você precisará dos dois pacotes a seguir:

Abaixo estão os exemplos de código necessários para conectar-se à Oracle a partir de um cliente Snowpark, procedimentos armazenados e Snowflake Notebooks.

Uso de DB-API para conectar-se ao Oracle a partir de um cliente Snowpark

  • Instalação de snowflake-snowpark-python[pandas] e oracledb

    pip install snowflake-snowpark-python[pandas]
    pip install oradb
    
    Copy
  • Uso de DB-API para extrair dados do Oracle e definir o método de fábrica para criar uma conexão com o Oracle

    def create_oracle_db_connection():
        import oracledb
        HOST = "myhost"
        PORT = "myport"
        SERVICE_NAME = "myservice"
        USER = "myuser"
        PASSWORD = "mypassword"
        DSN = "{HOST}:{PORT}/{SERVICE_NAME}"
        connection = oracledb.connect(
            user=USER,
            password=PASSWORD,
            dsn=DSN
        )
        return connection
    
    
    # Call dbapi to pull data from mytable
    
    df = session.read.dbapi(
      create_oracle_db_connection,
      table="mytable")
    
    Copy

Uso de DB-API para conectar-se ao Oracle a partir de um procedimento armazenado

A integração de acesso externo é necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

Nota

PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

    -- Configure the secret, a network rule to allow egress to the source endpoint and external access integration.
    
    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy

Uso de Snowpark Python DB-API para extrair dados do Oracle em um procedimento armazenado Python

CREATE OR REPLACE PROCEDURE sp_ora_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'oracledb')
  EXTERNAL_ACCESS_INTEGRATIONS = (ora_access_integration)
  SECRETS = ('cred' = ora_secret )
AS $$

# Get user name and password from ora_secret
import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to Oracle

from snowflake.snowpark import Session

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_ora_connection,
        table="ora_table"
    )
    return df
$$;

CALL sp_ora_dbapi();
Copy

Uso de DB-API para conectar-se ao Oracle a partir de um notebook Snowflake

  • Seleção de snowflake-snowpark-python e oracledb de pacotes do notebook.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

    CREATE OR REPLACE SECRET ora_secret
      TYPE = PASSWORD
      USERNAME = 'ora_username'
      PASSWORD = 'ora_password';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = ora_secret);
    
    -- Configure a network rule to allow egress to the source endpoint
    
    CREATE OR REPLACE NETWORK RULE ora_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('ora_host:ora_port');
    
    -- Configure an external access integration.
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ora_access_integration
      ALLOWED_NETWORK_RULES = (ora_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (ora_secret)
      ENABLED = true;
    
    Copy
  • Configurar acesso externo para o Snowflake Notebooks e depois reiniciar a sessão do notebook.

Uso de Snowpark Python DB-API para extrair dados do Oracle em uma célula Python do notebook Snowflake

# Get user name and password from ora_secret

import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to Oracle

def create_oracle_db_connection():
    import oracledb
    host = "ora_host"
    port = "ora_port"
    service_name = "ora_service"
    user = USER
    password = PASSWORD
    DSN = "{host}:{port}/{service_name}"
    connection = oracledb.connect(
        user=USER,
        password=PASSWORD,
        dsn=DSN
    )
    return connection

# Use dbapi to read data from ora_table

df_ora = session.read.dbapi(
  create_oracle_db_connection,
  table='ora_table'
)

# Save data into sf_table

df_ora.write.mode("overwrite").save_as_table('sf_table')
Copy

PostgreSQL

Para conectar-se ao PostgreSQL do Snowpark, você precisará dos dois pacotes a seguir:

Abaixo estão os exemplos de código necessários para conectar ao PostgreSQL do cliente Snowpark, procedimentos armazenados e notebooks Snowflake.

Uso de DB-API para conectar-se a PostgreSQL de um cliente Snowpark

  • Instalação do psycopg2

    pip install psycopg2
    
    Copy
  • Definição do método de fábrica para criar uma conexão com o PostgreSQL

    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user="pg_user",
            password="pg_password",
        )
        return connection
    
    
    # Call dbapi to pull data from pg_table
    
    df = session.read.dbapi(
      create_pg_connection,
      table="pg_table")
    
    Copy

Uso de DB-API para conectar-se a PostgreSQL de um procedimento armazenado

  • Configure a integração de acesso externo, necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE pg_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('pg_host:pg_port');

-- Configure an external access integration.

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
  ALLOWED_NETWORK_RULES = (pg_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
  ENABLED = true;
Copy
  • Uso de Snowpark Python DB-API para extrair dados de PostgreSQL em um procedimento armazenado Python

CREATE OR REPLACE PROCEDURE sp_pg_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'psycopg2')
EXTERNAL_ACCESS_INTEGRATIONS = (pg_access_integration)
SECRETS = ('cred' = pg_secret )
AS $$

# Get user name and password from pg_secret

import _snowflake
username_password_object = _snowflake.get_username_password('cred')
USER = username_password_object.username
PASSWORD = username_password_object.password

# Define the factory method for creating a connection to PostgreSQL

from snowflake.snowpark import Session

def create_pg_connection():
    import psycopg2
    connection = psycopg2.connect(
        host="pg_host",
        port=pg_port,
        dbname="pg_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    return df

$$;
CALL sp_pg_dbapi();
Copy

Uso de DB-API para conectar-se a PostgreSQL de um notebook Snowflake

  • Selecione snowflake-snowpark-python e psycopg2 de pacotes de notebook Snowflake.

  • Configure a integração de acesso externo, necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

-- Configure the secret

CREATE OR REPLACE SECRET pg_secret
  TYPE = PASSWORD
  USERNAME = 'pg_username'
  PASSWORD = 'pg_password';
  ALTER NOTEBOOK pg_notebook SET SECRETS = ('snowflake-secret-object' = pg_secret);

 -- Configure the network rule to allow egress to the source endpoint

 CREATE OR REPLACE NETWORK RULE pg_network_rule
    MODE = EGRESS
    TYPE = HOST_PORT
    VALUE_LIST = ('pg_host:pg_port');

 -- Configure external access integration

 CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION pg_access_integration
    ALLOWED_NETWORK_RULES = (pg_network_rule)
    ALLOWED_AUTHENTICATION_SECRETS = (pg_secret)
    ENABLED = true;
Copy
  • Configurar acesso externo para o Snowflake Notebooks e depois reiniciar a sessão do notebook.

    # Get the user name and password from :code:`pg_secret`
    
    import _snowflake
    username_password_object = _snowflake.get_username_password('snowflake-secret-object')
    USER = username_password_object.username
    PASSWORD = username_password_object.password
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to PostgreSQL
    
    def create_pg_connection():
        import psycopg2
        connection = psycopg2.connect(
            host="pg_host",
            port=pg_port,
            dbname="pg_dbname",
            user=USER,
            password=PASSWORD,
        )
        return connection
    
    # Use dbapi to read and save data from pg_table
    
    df = session.read.dbapi(
        create_pg_connection,
        table="pg_table"
    )
    
    # Save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

MySQL

Para conectar-se ao MySQL do Snowpark, você precisará dos dois pacotes a seguir:

Abaixo estão os exemplos de código necessários para conectar ao MySQL do cliente Snowpark, procedimentos armazenados e notebook Snowflake.

Uso de DB-API para conectar-se a MySQL de um cliente Snowpark

  • Instalação de pymysql

pip install snowflake-snowpark-python[pandas]
pip install pymysql
Copy
  • Definição do método de fábrica para criar uma conexão com o MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        database="mysql_db",
        user="mysql_user",
        password="mysql_password"
    )
    return connection


# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table"
)
Copy

Uso de DB-API para conectar-se a MySQL de um procedimento armazenado

  • Configure a integração de acesso externo, necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

-- Configure a network rule.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

-- Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
  • Uso de Snowpark Python DB-API para extrair dados de MySQL em um procedimento armazenado Python.

CREATE OR REPLACE PROCEDURE sp_mysql_dbapi()
RETURNS TABLE()
LANGUAGE PYTHON
RUNTIME_VERSION='3.11'
HANDLER='run'
PACKAGES=('snowflake-snowpark-python', 'pymysql')
EXTERNAL_ACCESS_INTEGRATIONS = (mysql_access_integration)
SECRETS = ('cred' = mysql_secret )
AS $$

# Get user name and password from mysql_secret

import _snowflake
    username_password_object = _snowflake.get_username_password('cred')
    USER = username_password_object.username
    PASSWORD = username_password_object.password

# Define the factory method for creating a connection to MySQL

from snowflake.snowpark import session

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_mysql_connection,
        table="mysql_table"
    )
    return df
$$;

CALL sp_mysql_dbapi();
Copy

Uso de DB-API para conectar-se a MySQL de um notebook Snowflake

  • Selecione snowflake-snowpark-python e pymysql de pacotes de notebook Snowflake.

  • Configure a integração de acesso externo, necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo e adicione-o ao notebook Snowflake.

CREATE OR REPLACE SECRET mysql_secret
  TYPE = PASSWORD
  USERNAME = 'mysql_username'
  PASSWORD = 'mysql_password';

  ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = mysql_secret);
Copy
  • Configure uma regra de rede e uma integração de acesso externo.

CREATE OR REPLACE NETWORK RULE mysql_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('mysql_host:mysql_port');

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION mysql_access_integration
  ALLOWED_NETWORK_RULES = (mysql_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (mysql_secret)
  ENABLED = true;
Copy
# Get user name and password from mysql_secret
import _snowflake
username_password_object = _snowflake.get_username_password('snowflake-secret-object')
USER = username_password_object.username
PASSWORD = username_password_object.password

import snowflake.snowpark.context
session = snowflake.snowpark.context.get_active_session()

# Define the factory method for creating a connection to MySQL

def create_mysql_connection():
    import pymysql
    connection = pymysql.connect(
        host="mysql_host",
        port=mysql_port,
        dbname="mysql_dbname",
        user=USER,
        password=PASSWORD,
    )
    return connection

# Call dbapi to pull data from mysql_table

df = session.read.dbapi(
    create_mysql_connection,
    table="mysql_table")

# Save data into sf_table

df.write.mode("overwrite").save_as_table('sf_table')
Copy

Databricks

Para conectar-se ao Databricks a partir do Snowpark, você precisará dos dois pacotes a seguir:

Abaixo estão os exemplos de código necessários para conectar-se ao Databricks a partir do cliente Snowpark, procedimentos armazenados e notebook Snowflake.

Uso de DB-API para conectar-se ao Databricks a partir de um cliente Snowpark

  • Instalar databricks-sql-connector:

pip install snowflake-snowpark-python[pandas]
pip install databricks-sql-connector
Copy
  • Definição do método de fábrica para criar uma conexão com o Databricks

def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname=HOST,
        http_path=PATH,
        access_token=ACCESS_TOKEN
    )
    return connection


#Call dbapi to pull data from mytable

df = session.read.dbapi(
    create_dbx_connection,
    table="dbx_table")
Copy

Uso de DB-API para conectar-se ao Databricks a partir de um procedimento armazenado

  • Selecione snowflake-snowpark-python e pymysql de pacotes de notebook Snowflake.

  • A integração de acesso externo é necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo, uma regra de rede para permitir a saída para o ponto de extremidade de origem e a integração de acesso externo.

CREATE OR REPLACE SECRET dbx_secret
  TYPE = GENERIC_STRING
  SECRET_STRING = 'dbx_access_token';

-- Configure a network rule

CREATE OR REPLACE NETWORK RULE dbx_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('dbx_host:dbx_port');

--  Configure an external access integration

CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
  ALLOWED_NETWORK_RULES = (dbx_network_rule)
  ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
  ENABLED = true;
Copy
  • Uso de Snowpark Python DB-API para extrair dados do Databricks em um procedimento armazenado Python

CREATE OR REPLACE PROCEDURE sp_dbx_dbapi()
  RETURNS TABLE()
  LANGUAGE PYTHON
  RUNTIME_VERSION='3.11'
  HANDLER='run'
  PACKAGES=('snowflake-snowpark-python', 'databricks-sql-connector')
  EXTERNAL_ACCESS_INTEGRATIONS = (dbx_access_integration)
  SECRETS = ('cred' = dbx_secret )
AS $$

# Get user name and password from dbx_secret

import _snowflake
ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')

from snowflake.snowpark import Session

# define the method for creating a connection to Databricks
def create_dbx_connection():
    import databricks.sql
    connection = databricks.sql.connect(
        server_hostname="dbx_host",
        http_path="dbx_path",
        access_token=ACCESS_TOKEN,
    )
    return connection

def run(session: Session):
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    return df

$$;

CALL sp_dbx_dbapi();
Copy

Uso de DB-API para conectar-se ao Databricks a partir de um notebook Snowflake

  • Selecione snowflake-snowpark-python e pymysql de pacotes de notebook Snowflake.

  • Configure a integração de acesso externo, necessária para permitir que o Snowflake se conecte ao ponto de extremidade de origem.

    Nota

    PrivateLink é recomendado para transferência segura de dados, especialmente ao lidar com informações sensíveis. Certifique-se de que sua conta do Snowflake tenha os privilégios PrivateLink necessários habilitados e o recurso PrivateLink esteja configurado e ativo em seu ambiente de notebook Snowflake.

  • Configure o segredo e adicione-o ao notebook Snowflake.

    CREATE OR REPLACE SECRET dbx_secret
      TYPE = GENERIC_STRING
      SECRET_STRING = 'dbx_access_token';
      ALTER NOTEBOOK mynotebook SET SECRETS = ('snowflake-secret-object' = dbx_secret);
    
    Copy
  • Configurar

    CREATE OR REPLACE NETWORK RULE dbx_network_rule
      MODE = EGRESS
      TYPE = HOST_PORT
      VALUE_LIST = ('dbx_host:dbx_port');
    
    CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dbx_access_integration
      ALLOWED_NETWORK_RULES = (dbx_network_rule)
      ALLOWED_AUTHENTICATION_SECRETS = (dbx_secret)
      ENABLED = true;
    
    Copy
  • Configurar acesso externo para o Snowflake Notebooks e depois reiniciar a sessão do notebook.

    # Get user name and password from dbx_secret
    
    import _snowflake
    ACCESS_TOKEN = _snowflake.get_generic_secret_string('cred')
    
    import snowflake.snowpark.context
    session = snowflake.snowpark.context.get_active_session()
    
    # Define the factory method for creating a connection to Databricks
    
    def create_dbx_connection():
        import databricks.sql
        connection = databricks.sql.connect(
            server_hostname="dbx_host",
            http_path="dbx_path",
            access_token=ACCESS_TOKEN,
        )
        return connection
    
    # use dbapi to read data from dbx_table
    
    df = session.read.dbapi(
        create_dbx_connection,
        table="dbx_table"
    )
    
    # save data into sf_table
    
    df.write.mode("overwrite").save_as_table('sf_table')
    
    Copy

Limitações

Drivers

Snowpark Python DB-API suporta apenas drivers compatíveis com Python DB-API 2.0 (por exemplo, pyodbc, oracledb). Os drivers JDBC não são suportados nesta versão.