Uso do Snowflake SQLAlchemy Toolkit com o conector Python

O Snowflake SQLAlchemy funciona com base no Snowflake Connector para Python como um dialeto para fazer a ponte entre um banco de dados Snowflake e aplicativos SQLAlchemy.

Para obter mais informações, consulte a documentação do dialeto.

Neste tópico:

Pré-requisitos

Conector Snowflake para Python

O único requisito para o Snowflake SQLAlchemy é o Conector Snowflake para Python; entretanto, o conector não precisa estar instalado porque a instalação do Snowflake SQLAlchemy instala automaticamente o conector.

Análise de dados e estruturas de aplicativos da Web (opcional)

O Snowflake SQLAlchemy pode ser usado com pandas, Jupyter e Pyramid, que fornecem níveis mais altos de estruturas de aplicativo para análise de dados e aplicativos da Web. No entanto, construir um ambiente de trabalho a partir do zero não é uma tarefa trivial, particularmente para os usuários novatos. A instalação das estruturas requer compiladores e ferramentas C, e a escolha das ferramentas e versões corretas é um obstáculo que pode impedir os usuários de usar aplicativos Python.

Uma maneira mais fácil de construir um ambiente é através do Anaconda, que fornece uma pilha de tecnologia completa e pré-compilada para todos os usuários, incluindo especialistas que não usam Python, como analistas de dados e estudantes. Para instruções de instalação do Anaconda, consulte a documentação de instalação do Anaconda. O pacote Snowflake SQLAlchemy pode então ser instalado sobre o Anaconda usando pip.

Para obter mais informações, consulte a seguinte documentação:

Instalação do Snowflake SQLAlchemy

O pacote Snowflake SQLAlchemy pode ser instalado a partir do repositório público PyPI usando pip:

pip install --upgrade snowflake-sqlalchemy
Copy

pip instala automaticamente todos os módulos necessários, incluindo o Conector do Snowflake para Python.

Observe que as notas do desenvolvedor são hospedadas com o código-fonte no GitHub.

Verificação de sua instalação

  1. Crie um arquivo (por exemplo, validate.py) que contenha o seguinte código de exemplo do Python, que se conecta ao Snowflake e exibe a versão do Snowflake:

    #!/usr/bin/env python
    from sqlalchemy import create_engine
    
    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/'.format(
            user='<user_login_name>',
            password='<password>',
            account_identifier='<account_identifier>',
        )
    )
    try:
        connection = engine.connect()
        results = connection.execute('select current_version()').fetchone()
        print(results[0])
    finally:
        connection.close()
        engine.dispose()
    
    Copy
  2. Substitua <user_login_name>, <password>, e <account_identifier> pelos valores apropriados para sua conta e usuário Snowflake. Para obter mais detalhes, consulte Parâmetros de conexão (neste tópico).

  3. Execute o código de exemplo. Por exemplo, se você criou um arquivo chamado validate.py:

    python validate.py
    
    Copy

A versão do Snowflake (por exemplo, 1.6.0) deve ser exibida.

Parâmetros e comportamento específicos do Snowflake

Na medida do possível, o Snowflake SQLAlchemy oferece funcionalidade compatível para aplicativos SQLAlchemy.

Para obter mais informações sobre o uso do SQLAlchemy, consulte a documentação do SQLAlchemy.

Entretanto, o Snowflake SQLAlchemy também fornece parâmetros e comportamento específicos do Snowflake, que são descritos nas seções seguintes.

Parâmetros de conexão

Parâmetros obrigatórios

O Snowflake SQLAlchemy usa a seguinte sintaxe de cadeia de conexão para se conectar ao Snowflake e iniciar uma sessão:

'snowflake://<user_login_name>:<password>@<account_identifier>'
Copy

Onde:

Parâmetros adicionais de conexão

Opcionalmente, você pode incluir as seguintes informações adicionais no final da cadeia de conexão (após <account_name>):

'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Copy

Onde:

  • <database_name> e <schema_name> são o banco de dados e o esquema iniciais para a sessão do Snowflake, separados por barras (/).

  • warehouse=<warehouse_name> e role=<role_name>' são o warehouse e função iniciais para a sessão, especificados como cadeias de parâmetros, separados por pontos de interrogação (?).

Nota

Após o login, o banco de dados, esquema, warehouse e função iniciais especificados na sequência de conexão podem sempre ser alterados para a sessão.

Configuração do servidor proxy

Os parâmetros de servidor proxy não têm suporte. Ao invés disso, use as variáveis de ambiente com suporte para configurar um servidor proxy. Para obter mais informações, consulte Uso de um servidor proxy.

Exemplos de cadeias de conexão

O exemplo seguinte chama o método create_engine com o nome do usuário testuser1, senha 0123456, identificador de conta myorganization-myaccount, banco de dados testdb, esquema public, warehouse testwh e função myrole:

from sqlalchemy import create_engine
engine = create_engine(
    'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole'
)
Copy

Por conveniência, você pode usar o método snowflake.sqlalchemy.URL para construir a cadeia de conexão e conectar-se ao banco de dados. O exemplo a seguir constrói a mesma cadeia de conexão do exemplo anterior:

from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = '0123456',
    database = 'testdb',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
))
Copy

Abertura e encerramento de uma conexão

Abra uma conexão executando engine.connect(); evite usar engine.execute().

# Avoid this.
engine = create_engine(...)
engine.execute(<SQL>)
engine.dispose()

# Do this.
engine = create_engine(...)
connection = engine.connect()
try:
    connection.execute(<SQL>)
finally:
    connection.close()
    engine.dispose()
Copy

Nota

Certifique-se de encerrar a conexão executando connection.close() antes de engine.dispose(); caso contrário, o coletor de lixo do Python remove os recursos necessários para se comunicar com o Snowflake, impedindo que o conector Python encerre corretamente a sessão.

Se você pretende utilizar transações explícitas, deve desativar a opção de execução AUTOCOMMIT no SQLAlchemy.

Para mais informações, consulte a opção de execução AUTOCOMMIT em SQLAlchemy.

Por padrão, o SQLAlchemy habilita esta opção. Quando esta opção é ativada, as instruções INSERT, UPDATE, e DELETE são confirmadas automaticamente após a execução, mesmo quando estas instruções são executadas dentro de uma transação explícita.

Para desativar AUTOCOMMIT, passe autocommit=False para o método Connection.execution_options(). Por exemplo:

# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:

  try:
    connection.execute("BEGIN")
    connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
    connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
    connection.execute("COMMIT")
  except Exception as e:
    connection.execute("ROLLBACK")
  finally:
    connection.close()

engine.dispose()
Copy

Comportamento de incremento automático

O incremento automático de um valor requer o objeto Sequence. Inclua o objeto Sequence na coluna da chave primária para incrementar automaticamente o valor à medida que cada novo registro é inserido. Por exemplo:

t = Table('mytable', metadata,
    Column('id', Integer, Sequence('id_seq'), primary_key=True),
    Column(...), ...
)
Copy

Tratamento de maiúsculas e minúsculas no nome de objetos

O Snowflake armazena todos os nomes de objetos que não diferenciam maiúsculas e minúsculas usando maiúsculas. Em contraste, o SQLAlchemy considera que todos os nomes de objetos em minúsculas não diferenciam maiúsculas e minúsculas. O Snowflake SQLAlchemy converte as maiúsculas/minúsculas do nome de objetos durante a comunicação em nível de esquema (ou seja, durante a reflexão de tabela e índice). Se você usar nomes de objetos em maiúsculas, o SQLAlchemy considera que eles diferenciam maiúsculas/minúsculas e envolve os nomes com aspas. Este comportamento comprometerá a correspondência com os dados do dicionário de dados recebidos do Snowflake; portanto, a menos que nomes de identificador tenham sido verdadeiramente criados para diferenciar maiúsculas/minúsculas (usando aspas, como em "TestDb"), nomes em minúsculas devem ser usados no lado do SQLAlchemy.

Suporte para índices

Os índices são suportados somente para tabelas híbridas no Snowflake SqlAlchemy. Para obter mais detalhes sobre limitações e casos de uso, consulte a documentação de Criar Índice. Você pode criar um índice usando os seguintes métodos:

  • Índice de coluna única

    Você pode criar um índice de coluna única definindo o parâmetro index=True na coluna ou definindo explicitamente um objeto Index.

    hybrid_test_table_1 = HybridTable(
      "table_name",
      metadata,
      Column("column1", Integer, primary_key=True),
      Column("column2", String, index=True),
      Index("index_1","column1", "column2")
    )
    
    metadata.create_all(engine_testaccount)
    
    Copy
  • Índice de várias colunas

    Para índices de várias colunas, você define o objeto Índice especificando as colunas que devem ser indexadas.

    hybrid_test_table_1 = HybridTable(
      "table_name",
      metadata,
      Column("column1", Integer, primary_key=True),
      Column("column2", String),
      Index("index_1","column1", "column2")
    )
    
    metadata.create_all(engine_testaccount)
    
    Copy

Suporte para tipos de dados Numpy

O Snowflake SQLAlchemy suporta a vinculação e busca de tipos de dados NumPy. A vinculação sempre tem suporte. Para permitir a busca de tipos de dados NumPy, adicione numpy=True aos parâmetros de conexão.

Os seguintes tipos de dados NumPy têm suporte:

  • numpy.int64

  • numpy.float64

  • numpy.datetime64

O exemplo a seguir mostra a ida e volta de dados numpy.datetime64:

import numpy as np
import pandas as pd
engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    numpy=True,
))

specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z')

with engine.connect() as connection:
    conenction.exec_sql_query(
        "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)")
    connection.exec_sql_query(
        "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,)
    )
    df = pd.read_sql_query("SELECT * FROM ts_tbl", connection)
    assert df.c1.values[0] == specific_date
Copy

Metadados de coluna do cache

O SQLAlchemy fornece a API de inspeção do tempo de execução para obter as informações de tempo de execução sobre os vários objetos. Um dos casos de uso comuns é obter todas as tabelas e seus metadados de coluna em um esquema a fim de construir um catálogo de esquemas.

Para obter mais informações, consulte a API de inspeção do tempo de execução. Para um exemplo de gerenciamento de migrações de esquema de banco de dados com SQLAlchemy, alembic.

Um fluxo de pseudocódigo pode ser o seguinte:

inspector = inspect(engine)
schema = inspector.default_schema_name
for table_name in inspector.get_table_names(schema):
    column_metadata = inspector.get_columns(table_name, schema)
    primary_keys = inspector.get_primary_keys(table_name, schema)
    foreign_keys = inspector.get_foreign_keys(table_name, schema)
    ...
Copy

Neste fluxo, um problema potencial é que ele pode levar bastante tempo, já que as consultas são feitas em cada tabela. Os resultados são armazenados em cache, mas a obtenção dos metadados de coluna é cara.

Para mitigar o problema, o Snowflake SQLAlchemy usa um sinalizador cache_column_metadata=True de tal forma que todos os metadados das colunas de todas as tabelas são armazenados em cache quando get_table_names é chamado, e o resto de get_columns, get_primary_keys e get_foreign_keys pode tirar proveito do cache.

engine = create_engine(URL(
    account = 'myorganization-myaccount',
    user = 'testuser1',
    password = 'pass',
    database = 'db',
    schema = 'public',
    warehouse = 'testwh',
    role='myrole',
    cache_column_metadata=True,
))
Copy

Nota

O uso de memória subirá mais, pois todos os metadados das colunas associados ao objeto Inspector são armazenados em cache. Use o sinalizador somente se você precisar obter todos os metadados das colunas.

Suporte para VARIANT, ARRAY e OBJECT

O Snowflake SQLAlchemy suporta a busca dos tipos de dados VARIANT, ARRAY e OBJECT. Todos os tipos são convertidos em str no Python para que você possa convertê-los em tipos de dados nativos usando json.loads.

Este exemplo mostra como criar uma tabela incluindo colunas de tipos de dados VARIANT, ARRAY e OBJECT:

from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT)
...
t = Table('my_semi_structured_datatype_table', metadata,
    Column('va', VARIANT),
    Column('ob', OBJECT),
    Column('ar', ARRAY))
metdata.create_all(engine)
Copy

Para recuperar as colunas de tipos de dados VARIANT, ARRAY e OBJECT e convertê-los para os tipos de dados Python nativos, busque dados e chame o método json.loads como segue:

import json
connection = engine.connect()
results = connection.execute(select([t]))
row = results.fetchone()
data_variant = json.loads(row[0])
data_object  = json.loads(row[1])
data_array   = json.loads(row[2])
Copy

Suporte a tipos de dados estruturados

Este módulo define tipos SQLAlchemy personalizados para dados estruturados do Snowflake, especificamente para tabelas Iceberg. Os tipos MAP, OBJECT e ARRAY permitem que você armazene estruturas de dados complexas em seus modelos SQLAlchemy. Para obter informações detalhadas, consulte a documentação do Snowflake Tipos de dados estruturados.

MAP

O tipo MAP representa uma coleção de pares chave-valor, em que cada chave e valor podem ter tipos diferentes, conforme mostrado:

  • Key Type: o tipo de chave, como TEXT ou NUMBER).

  • Value Type: o tipo de valor, como TEXT ou NUMBER).

  • Not Null: se os valores NULL são permitidos (o padrão é False).

Exemplo de uso:

IcebergTable(
  table_name,
  metadata,
  Column("id", Integer, primary_key=True),
  Column("map_col", MAP(NUMBER(10, 0), TEXT(16777216))),
  external_volume="external_volume",
  base_location="base_location",
)
Copy

OBJECT

O tipo OBJECT representa um objeto semiestruturado com campos nomeados. Cada campo pode ter um tipo específico, e você também pode especificar se cada campo é anulável.

  • Items Types: um dicionário de nomes de campos e seus tipos. O tipo pode, opcionalmente, incluir um sinalizador anulável (True para não anulável ou False para anulável [padrão]).

Exemplo de uso:

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column(
        "object_col",
        OBJECT(key1=(TEXT(16777216), False), key2=(NUMBER(10, 0), False)),
        OBJECT(key1=TEXT(16777216), key2=NUMBER(10, 0)), # Without nullable flag
    ),
    external_volume="external_volume",
    base_location="base_location",
)
Copy

ARRAY

O tipo ARRAY representa uma lista ordenada de valores, em que cada elemento tem o mesmo tipo. O tipo dos elementos é definido quando a matriz é criada.

  • Value Type: o tipo dos elementos na matriz, como TEXT ou NUMBER).

  • Not Null: se os valores de NULL são permitidos (o padrão é False).

Exemplo de uso:

IcebergTable(
    table_name,
    metadata,
    Column("id", Integer, primary_key=True),
    Column("array_col", ARRAY(TEXT(16777216))),
    external_volume="external_volume",
    base_location="base_location",
)
Copy

Suporte para CLUSTER BY

O Snowflake SQLAlchemy suporta o parâmetro CLUSTER BY para tabelas. Para obter mais informações sobre o parâmetro, consulte CREATE TABLE.

Este exemplo mostra como criar uma tabela com duas colunas, id e name, como a chave de clustering:

t = Table('myuser', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String),
    snowflake_clusterby=['id', 'name'], ...
)
metadata.create_all(engine)
Copy

Suporte para Alembic

O Alembic é uma ferramenta de migração de banco de dados para uso no SQLAlchemy. O Snowflake SQLAlchemy funciona adicionando o seguinte código ao alembic/env.py para que o Alembic possa reconhecer o Snowflake SQLAlchemy.

from alembic.ddl.impl import DefaultImpl

class SnowflakeImpl(DefaultImpl):
    __dialect__ = 'snowflake'
Copy

Consulte a documentação do Alembic para uso geral.

Suporte à autenticação de par de chaves

Snowflake SQLAlchemy oferece suporte à autenticação de par de chaves alavancando a funcionalidade do conector do Snowflake para Python. Consulte Uso de autenticação de pares de chaves e rotação de pares de chaves para obter os passos para a criação das chaves privadas e públicas.

O parâmetro de chave privada é passado por connect_args da seguinte forma:

...
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

with open("rsa_key.p8", "rb") as key:
    p_key= serialization.load_pem_private_key(
        key.read(),
        password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())

engine = create_engine(URL(
    account='abc123',
    user='testuser1',
    ),
    connect_args={
        'private_key': pkb,
        },
    )
Copy

Onde PRIVATE_KEY_PASSPHRASE é uma frase secreta para descriptografar o arquivo de chave privada, rsa_key.p8.

O método snowflake.sqlalchemy.URL não oferece suporte a parâmetros de chave privada.

Suporte ao comando de mescla

Snowflake SQLAlchemy oferece suporte à execução de um upsert com sua expressão personalizada MergeInto. Consulte MERGE para obter a documentação completa.

Use-o da seguinte forma:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import MergeInto

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
t1 = meta.tables['t1']
t2 = meta.tables['t2']

merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
merge.when_matched_then_delete().where(t2.c.marked == 1)
merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
merge.when_matched_then_update().values(val=t2.c.newval)
merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
connection.execute(merge)
Copy

Suporte para CopyIntoStorage

Snowflake SQLAlchemy oferece suporte a salvar tabelas e consultar resultados em diferentes estágios do Snowflake, Azure Containers e buckets AWS com sua expressão personalizada CopyIntoStorage. Consulte COPY INTO <local> para obter a documentação completa.

Use-o da seguinte forma:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import MetaData, create_engine
from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter

engine = create_engine(db.url, echo=False)
session = sessionmaker(bind=engine)()
connection = engine.connect()

meta = MetaData()
meta.reflect(bind=session.bind)
users = meta.tables['users']

copy_into = CopyIntoStorage(from_=users,
                            into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'),
                            formatter=CSVFormatter().null_if(['null', 'Null']))
connection.execute(copy_into)
Copy

Tabela Iceberg com suporte do catálogo Snowflake

O Snowflake SQLAlchemy oferece suporte a tabelas Iceberg com o catálogo Snowflake, juntamente com vários parâmetros relacionados. Para obter informações detalhadas sobre as tabelas Iceberg, consulte a documentação do Snowflake CREATE ICEBERG.

Para criar uma tabela Iceberg usando o Snowflake SQLAlchemy, você pode definir a tabela usando a sintaxe SQLAlchemy Core da seguinte forma:

table = IcebergTable(
        "myuser",
        metadata,
        Column("id", Integer, primary_key=True),
        Column("name", String),
        external_volume=external_volume_name,
        base_location="my_iceberg_table",
  as_query="SELECT * FROM table"
    )
Copy

Como alternativa, você pode definir a tabela usando uma abordagem declarativa:

class MyUser(Base):
    __tablename__ = "myuser"

    @classmethod
    def __table_cls__(cls, name, metadata, *arg, **kw):
        return IcebergTable(name, metadata, *arg, **kw)

    __table_args__ = {
        "external_volume": "my_external_volume",
        "base_location": "my_iceberg_table",
  "as_query": "SELECT * FROM table",
    }

    id = Column(Integer, primary_key=True)
    name = Column(String)
Copy

Suporte à tabela híbrida

O Snowflake SQLAlchemy oferece suporte a tabelas híbridas com índices. Para obter informações detalhadas, consulte a documentação do Snowflake CREATE HYBRID TABLE.

Para criar uma tabela híbrida e adicionar um índice, você pode usar a sintaxe do SQLAlchemy Core da seguinte forma:

table = HybridTable(
    "myuser",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
    Index("idx_name", "name")
Copy

Como alternativa, você pode definir a tabela usando a abordagem declarativa:

class MyUser(Base):
    __tablename__ = "myuser"

    @classmethod
    def __table_cls__(cls, name, metadata, *arg, **kw):
        return HybridTable(name, metadata, *arg, **kw)

    __table_args__ = (
        Index("idx_name", "name"),
    )

    id = Column(Integer, primary_key=True)
    name = Column(String)
Copy

Suporte a tabelas dinâmicas

O Snowflake SQLAlchemy oferece suporte a tabelas dinâmicas. Para obter informações detalhadas, consulte a documentação do Snowflake CREATE DYNAMIC TABLE.

Para criar uma tabela dinâmica, você pode usar a sintaxe do SQLAlchemy Core da seguinte forma:

 dynamic_test_table_1 = DynamicTable(
       "dynamic_MyUser",
       metadata,
       Column("id", Integer),
       Column("name", String),
       target_lag=(1, TimeUnit.HOURS), # Additionally you can use SnowflakeKeyword.DOWNSTREAM
       warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL
       as_query="SELECT id, name from MyUser;"
   )
Copy

Além disso, você pode definir uma tabela sem colunas usando a construção SqlAlchemy select():

 dynamic_test_table_1 = DynamicTable(
       "dynamic_MyUser",
       metadata,
       target_lag=(1, TimeUnit.HOURS),
       warehouse='test_wh',
refresh_mode=SnowflakeKeyword.FULL
       as_query=select(MyUser.id, MyUser.name)
   )
Copy

Nota

  • Não há suporte para a definição de uma chave primária em uma tabela dinâmica, o que significa que as tabelas declarativas não são compatíveis com as tabelas dinâmicas.

  • Ao usar o parâmetro as_query com uma cadeia de caracteres, você deve definir explicitamente as colunas. No entanto, se você usar a construção SQLAlchemy: codenowrap:select(), não precisará definir explicitamente as colunas.

  • Não há suporte para a inserção direta de dados em tabelas dinâmicas.