Uso do Snowflake SQLAlchemy Toolkit com o conector Python

O Snowflake SQLAlchemy funciona com base no conector Snowflake para Python como um dialeto para fazer a ponte entre um banco de dados Snowflake e aplicativos SQLAlchemy.

Neste tópico:

Pré-requisitos

Conector Snowflake para Python

O único requisito para o Snowflake SQLAlchemy é o Conector Snowflake para Python; entretanto, o conector não precisa estar instalado porque a instalação do Snowflake SQLAlchemy instala automaticamente o conector.

Análise de dados e estruturas de aplicativos da Web (opcional)

O Snowflake SQLAlchemy pode ser usado com pandas, Jupyter e Pyramid, que fornecem níveis mais altos de estruturas de aplicativo para análise de dados e aplicativos da Web. No entanto, construir um ambiente de trabalho a partir do zero não é uma tarefa trivial, particularmente para os usuários novatos. A instalação das estruturas requer compiladores e ferramentas C, e a escolha das ferramentas e versões corretas é um obstáculo que pode impedir os usuários de usar aplicativos Python.

Uma maneira mais fácil de construir um ambiente é através do Anaconda, que fornece uma pilha de tecnologia completa e pré-compilada para todos os usuários, incluindo especialistas que não usam Python, como analistas de dados e estudantes. Para instruções de instalação do Anaconda, consulte a documentação de instalação do Anaconda. O pacote Snowflake SQLAlchemy pode então ser instalado sobre o Anaconda usando pip.

Instalação do Snowflake SQLAlchemy

O pacote Snowflake SQLAlchemy pode ser instalado a partir do repositório público PyPI usando pip:

pip install --upgrade snowflake-sqlalchemy
Copy

pip instala automaticamente todos os módulos necessários, incluindo o Conector do Snowflake para Python.

Observe que as notas do desenvolvedor são hospedadas com o código-fonte no GitHub.

Verificação de sua instalação

  1. Crie um arquivo (por exemplo, validate.py) que contenha o seguinte código de exemplo do Python, que se conecta ao Snowflake e exibe a versão do Snowflake:

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. Substitua <user_login_name>, <password>, e <account_identifier> pelos valores apropriados para sua conta e usuário Snowflake. Para obter mais detalhes, consulte Parâmetros de conexão (neste tópico).

  3. Execute o código de exemplo. Por exemplo, se você criou um arquivo chamado validate.py:

    python validate.py
    
    Copy

A versão do Snowflake (por exemplo, 1.48.0) deve ser exibida.

Parâmetros e comportamento específicos do Snowflake

Na medida do possível, o Snowflake SQLAlchemy oferece funcionalidade compatível para aplicativos SQLAlchemy. Para obter mais informações sobre o uso do SQLAlchemy, consulte a documentação do SQLAlchemy.

Entretanto, o Snowflake SQLAlchemy também fornece parâmetros e comportamento específicos do Snowflake, que são descritos nas seções seguintes.

Parâmetros de conexão

Parâmetros obrigatórios

O Snowflake SQLAlchemy usa a seguinte sintaxe de cadeia de conexão para se conectar ao Snowflake e iniciar uma sessão:

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

Onde:

  • <user_login_name> é o nome de login para seu usuário do Snowflake.

  • <password> é a senha para seu usuário do Snowflake.

  • <account_identifier> é o identificador de sua conta. Consulte Identificadores de conta.

    Nota

    Não inclua o nome de domínio snowflakecomputing.com como parte do identificador de sua conta. O Snowflake acrescenta automaticamente o nome de domínio ao identificador de sua conta para criar a conexão necessária.

Parâmetros adicionais de conexão

Opcionalmente, você pode incluir as seguintes informações adicionais no final da cadeia de conexão (após <account_name>):

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

Onde:

  • <database_name> e <schema_name> são o banco de dados e o esquema iniciais para a sessão do Snowflake, separados por barras (/).

  • warehouse=<warehouse_name> e role=<role_name>' são o warehouse e função iniciais para a sessão, especificados como cadeias de parâmetros, separados por pontos de interrogação (?).

Nota

Após o login, o banco de dados, esquema, warehouse e função iniciais especificados na sequência de conexão podem sempre ser alterados para a sessão.

Configuração do servidor proxy

Os parâmetros de servidor proxy não têm suporte. Ao invés disso, use as variáveis de ambiente com suporte para configurar um servidor proxy. Para obter mais informações, consulte Uso de um servidor proxy.

Exemplos de cadeias de conexão

O exemplo seguinte chama o método create_engine com o nome do usuário testuser1, senha 0123456, identificador de conta myorganization-myaccount, banco de dados testdb, esquema public, warehouse testwh e função myrole:

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

Por conveniência, você pode usar o método snowflake.sqlalchemy.URL para construir a cadeia de conexão e conectar-se ao banco de dados. O exemplo a seguir constrói a mesma cadeia de conexão do exemplo anterior:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

Abertura e encerramento de uma conexão

Abra uma conexão executando engine.connect(); evite usar engine.execute().

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

Nota

Certifique-se de encerrar a conexão executando connection.close() antes de engine.dispose(); caso contrário, o coletor de lixo do Python remove os recursos necessários para se comunicar com o Snowflake, impedindo que o conector Python encerre corretamente a sessão.

Se você pretende utilizar transações explícitas, deve desativar a opção de execução AUTOCOMMIT no SQLAlchemy.

Por padrão, o SQLAlchemy habilita esta opção. Quando esta opção é ativada, as instruções INSERT, UPDATE, e DELETE são confirmadas automaticamente após a execução, mesmo quando estas instruções são executadas dentro de uma transação explícita.

Para desativar AUTOCOMMIT, passe autocommit=False para o método Connection.execution_options(). Por exemplo:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

Comportamento de incremento automático

O incremento automático de um valor requer o objeto Sequence. Inclua o objeto Sequence na coluna da chave primária para incrementar automaticamente o valor à medida que cada novo registro é inserido. Por exemplo:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

Tratamento de maiúsculas e minúsculas no nome de objetos

O Snowflake armazena todos os nomes de objetos que não diferenciam maiúsculas e minúsculas usando maiúsculas. Em contraste, o SQLAlchemy considera que todos os nomes de objetos em minúsculas não diferenciam maiúsculas e minúsculas. O Snowflake SQLAlchemy converte as maiúsculas/minúsculas do nome de objetos durante a comunicação em nível de esquema (ou seja, durante a reflexão de tabela e índice). Se você usar nomes de objetos em maiúsculas, o SQLAlchemy considera que eles diferenciam maiúsculas/minúsculas e envolve os nomes com aspas. Este comportamento comprometerá a correspondência com os dados do dicionário de dados recebidos do Snowflake; portanto, a menos que nomes de identificador tenham sido verdadeiramente criados para diferenciar maiúsculas/minúsculas (usando aspas, como em "TestDb"), nomes em minúsculas devem ser usados no lado do SQLAlchemy.

Suporte para índices

O Snowflake não utiliza índices, nem o Snowflake SQLAlchemy.

Suporte para tipos de dados Numpy

O Snowflake SQLAlchemy suporta a vinculação e busca de tipos de dados NumPy. A vinculação sempre tem suporte. Para permitir a busca de tipos de dados NumPy, adicione numpy=True aos parâmetros de conexão.

Os seguintes tipos de dados NumPy têm suporte:

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

O exemplo a seguir mostra a ida e volta de dados numpy.datetime64:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

connection = engine.connect()
connection.execute(
    "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
connection.execute(
    "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
)
df = pd.read_sql_query("SELECT * FROM ts_tbl", engine)
assert df.c1.values[0] == specific_date
Copy

Metadados de coluna do cache

O SQLAlchemy fornece a API de inspeção do tempo de execução para obter as informações de tempo de execução sobre os vários objetos. Um dos casos de uso comuns é obter todas as tabelas e seus metadados de coluna em um esquema a fim de construir um catálogo de esquemas. Por exemplo, o Alembic sobre o SQLAlchemy gerencia as migrações de esquemas de banco de dados. Um fluxo de pseudocódigo pode ser o seguinte:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

Neste fluxo, um problema potencial é que ele pode levar bastante tempo, já que as consultas são feitas em cada tabela. Os resultados são armazenados em cache, mas a obtenção dos metadados de coluna é cara.

Para mitigar o problema, o Snowflake SQLAlchemy usa um sinalizador cache_column_metadata=True de tal forma que todos os metadados das colunas de todas as tabelas são armazenados em cache quando get_table_names é chamado, e o resto de get_columns, get_primary_keys e get_foreign_keys pode tirar proveito do cache.

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

Nota

O uso de memória subirá mais, pois todos os metadados das colunas associados ao objeto Inspector são armazenados em cache. Use o sinalizador somente se você precisar obter todos os metadados das colunas.

Suporte para VARIANT, ARRAY e OBJECT

O Snowflake SQLAlchemy suporta a busca dos tipos de dados VARIANT, ARRAY e OBJECT. Todos os tipos são convertidos em str no Python para que você possa convertê-los em tipos de dados nativos usando json.loads.

Este exemplo mostra como criar uma tabela incluindo colunas de tipos de dados VARIANT, ARRAY e OBJECT:

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

Para recuperar as colunas de tipos de dados VARIANT, ARRAY e OBJECT e convertê-los para os tipos de dados Python nativos, busque dados e chame o método json.loads como segue:

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

Suporte para CLUSTER BY

O Snowflake SQLAlchemy suporta o parâmetro CLUSTER BY para tabelas. Para obter mais informações sobre o parâmetro, consulte CREATE TABLE.

Este exemplo mostra como criar uma tabela com duas colunas, id e name, como a chave de clustering:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Suporte para Alembic

O Alembic é uma ferramenta de migração de banco de dados para uso no SQLAlchemy. O Snowflake SQLAlchemy funciona adicionando o seguinte código ao alembic/env.py para que o Alembic possa reconhecer o Snowflake SQLAlchemy.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

Consulte a documentação do Alembic para uso geral.

Suporte à autenticação de par de chaves

Snowflake SQLAlchemy oferece suporte à autenticação de par de chaves alavancando a funcionalidade do conector do Snowflake para Python. Consulte Uso de autenticação de pares de chaves e rotação de pares de chaves para obter os passos para a criação das chaves privadas e públicas.

O parâmetro de chave privada é passado por connect_args da seguinte forma:

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

Onde PRIVATE_KEY_PASSPHRASE é uma frase secreta para descriptografar o arquivo de chave privada, rsa_key.p8.

O método snowflake.sqlalchemy.URL não oferece suporte a parâmetros de chave privada.

Suporte ao comando de mescla

Snowflake SQLAlchemy oferece suporte à execução de um upsert com sua expressão personalizada MergeInto. Consulte MERGE para obter a documentação completa.

Use-o da seguinte forma:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

Suporte para CopyIntoStorage

Snowflake SQLAlchemy oferece suporte a salvar tabelas e consultar resultados em diferentes estágios do Snowflake, Azure Containers e buckets AWS com sua expressão personalizada CopyIntoStorage. Consulte COPY INTO <local> para obter a documentação completa.

Use-o da seguinte forma:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy