Uso do Snowflake SQLAlchemy Toolkit com o conector Python¶
O Snowflake SQLAlchemy funciona com base no conector Snowflake para Python como um dialeto para fazer a ponte entre um banco de dados Snowflake e aplicativos SQLAlchemy.
Neste tópico:
Pré-requisitos¶
Conector Snowflake para Python¶
O único requisito para o Snowflake SQLAlchemy é o Conector Snowflake para Python; entretanto, o conector não precisa estar instalado porque a instalação do Snowflake SQLAlchemy instala automaticamente o conector.
Análise de dados e estruturas de aplicativos da Web (opcional)¶
O Snowflake SQLAlchemy pode ser usado com pandas, Jupyter e Pyramid, que fornecem níveis mais altos de estruturas de aplicativo para análise de dados e aplicativos da Web. No entanto, construir um ambiente de trabalho a partir do zero não é uma tarefa trivial, particularmente para os usuários novatos. A instalação das estruturas requer compiladores e ferramentas C, e a escolha das ferramentas e versões corretas é um obstáculo que pode impedir os usuários de usar aplicativos Python.
Uma maneira mais fácil de construir um ambiente é através do Anaconda, que fornece uma pilha de tecnologia completa e pré-compilada para todos os usuários, incluindo especialistas que não usam Python, como analistas de dados e estudantes. Para instruções de instalação do Anaconda, consulte a documentação de instalação do Anaconda. O pacote Snowflake SQLAlchemy pode então ser instalado sobre o Anaconda usando pip.
Instalação do Snowflake SQLAlchemy¶
O pacote Snowflake SQLAlchemy pode ser instalado a partir do repositório público PyPI usando pip
:
pip install --upgrade snowflake-sqlalchemy
pip
instala automaticamente todos os módulos necessários, incluindo o Conector do Snowflake para Python.
Observe que as notas do desenvolvedor são hospedadas com o código-fonte no GitHub.
Verificação de sua instalação¶
Crie um arquivo (por exemplo,
validate.py
) que contenha o seguinte código de exemplo do Python, que se conecta ao Snowflake e exibe a versão do Snowflake:#!/usr/bin/env python from sqlalchemy import create_engine engine = create_engine( 'snowflake://{user}:{password}@{account_identifier}/'.format( user='<user_login_name>', password='<password>', account_identifier='<account_identifier>', ) ) try: connection = engine.connect() results = connection.execute('select current_version()').fetchone() print(results[0]) finally: connection.close() engine.dispose()
Substitua
<user_login_name>
,<password>
, e<account_identifier>
pelos valores apropriados para sua conta e usuário Snowflake. Para obter mais detalhes, consulte Parâmetros de conexão (neste tópico).Execute o código de exemplo. Por exemplo, se você criou um arquivo chamado
validate.py
:python validate.py
A versão do Snowflake (por exemplo, 1.48.0
) deve ser exibida.
Parâmetros e comportamento específicos do Snowflake¶
Na medida do possível, o Snowflake SQLAlchemy oferece funcionalidade compatível para aplicativos SQLAlchemy. Para obter mais informações sobre o uso do SQLAlchemy, consulte a documentação do SQLAlchemy.
Entretanto, o Snowflake SQLAlchemy também fornece parâmetros e comportamento específicos do Snowflake, que são descritos nas seções seguintes.
Parâmetros de conexão¶
Parâmetros obrigatórios¶
O Snowflake SQLAlchemy usa a seguinte sintaxe de cadeia de conexão para se conectar ao Snowflake e iniciar uma sessão:
'snowflake://<user_login_name>:<password>@<account_identifier>'
Onde:
<user_login_name>
é o nome de login para seu usuário do Snowflake.<password>
é a senha para seu usuário do Snowflake.<account_identifier>
é o identificador de sua conta. Consulte Identificadores de conta.Nota
Não inclua o nome de domínio
snowflakecomputing.com
como parte do identificador de sua conta. O Snowflake acrescenta automaticamente o nome de domínio ao identificador de sua conta para criar a conexão necessária.
Parâmetros adicionais de conexão¶
Opcionalmente, você pode incluir as seguintes informações adicionais no final da cadeia de conexão (após <account_name>
):
'snowflake://<user_login_name>:<password>@<account_identifier>/<database_name>/<schema_name>?warehouse=<warehouse_name>&role=<role_name>'
Onde:
<database_name>
e<schema_name>
são o banco de dados e o esquema iniciais para a sessão do Snowflake, separados por barras (/
).warehouse=<warehouse_name>
erole=<role_name>'
são o warehouse e função iniciais para a sessão, especificados como cadeias de parâmetros, separados por pontos de interrogação (?
).
Nota
Após o login, o banco de dados, esquema, warehouse e função iniciais especificados na sequência de conexão podem sempre ser alterados para a sessão.
Configuração do servidor proxy¶
Os parâmetros de servidor proxy não têm suporte. Ao invés disso, use as variáveis de ambiente com suporte para configurar um servidor proxy. Para obter mais informações, consulte Uso de um servidor proxy.
Exemplos de cadeias de conexão¶
O exemplo seguinte chama o método create_engine
com o nome do usuário testuser1
, senha 0123456
, identificador de conta myorganization-myaccount
, banco de dados testdb
, esquema public
, warehouse testwh
e função myrole
:
from sqlalchemy import create_engine engine = create_engine( 'snowflake://testuser1:0123456@myorganization-myaccount/testdb/public?warehouse=testwh&role=myrole' )
Por conveniência, você pode usar o método snowflake.sqlalchemy.URL
para construir a cadeia de conexão e conectar-se ao banco de dados. O exemplo a seguir constrói a mesma cadeia de conexão do exemplo anterior:
from snowflake.sqlalchemy import URL from sqlalchemy import create_engine engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = '0123456', database = 'testdb', schema = 'public', warehouse = 'testwh', role='myrole', ))
Abertura e encerramento de uma conexão¶
Abra uma conexão executando engine.connect()
; evite usar engine.execute()
.
# Avoid this. engine = create_engine(...) engine.execute(<SQL>) engine.dispose() # Do this. engine = create_engine(...) connection = engine.connect() try: connection.execute(<SQL>) finally: connection.close() engine.dispose()
Nota
Certifique-se de encerrar a conexão executando connection.close()
antes de engine.dispose()
; caso contrário, o coletor de lixo do Python remove os recursos necessários para se comunicar com o Snowflake, impedindo que o conector Python encerre corretamente a sessão.
Se você pretende utilizar transações explícitas, deve desativar a opção de execução AUTOCOMMIT no SQLAlchemy.
Por padrão, o SQLAlchemy habilita esta opção. Quando esta opção é ativada, as instruções INSERT, UPDATE, e DELETE são confirmadas automaticamente após a execução, mesmo quando estas instruções são executadas dentro de uma transação explícita.
Para desativar AUTOCOMMIT, passe autocommit=False
para o método Connection.execution_options()
. Por exemplo:
# Disable AUTOCOMMIT if you need to use an explicit transaction.
with engine.connect().execution_options(autocommit=False) as connection:
try:
connection.execute("BEGIN")
connection.execute("INSERT INTO test_table VALUES (88888, 'X', 434354)")
connection.execute("INSERT INTO test_table VALUES (99999, 'Y', 453654654)")
connection.execute("COMMIT")
except Exception as e:
connection.execute("ROLLBACK")
finally:
connection.close()
engine.dispose()
Comportamento de incremento automático¶
O incremento automático de um valor requer o objeto Sequence
. Inclua o objeto Sequence
na coluna da chave primária para incrementar automaticamente o valor à medida que cada novo registro é inserido. Por exemplo:
t = Table('mytable', metadata, Column('id', Integer, Sequence('id_seq'), primary_key=True), Column(...), ... )
Tratamento de maiúsculas e minúsculas no nome de objetos¶
O Snowflake armazena todos os nomes de objetos que não diferenciam maiúsculas e minúsculas usando maiúsculas. Em contraste, o SQLAlchemy considera que todos os nomes de objetos em minúsculas não diferenciam maiúsculas e minúsculas. O Snowflake SQLAlchemy converte as maiúsculas/minúsculas do nome de objetos durante a comunicação em nível de esquema (ou seja, durante a reflexão de tabela e índice). Se você usar nomes de objetos em maiúsculas, o SQLAlchemy considera que eles diferenciam maiúsculas/minúsculas e envolve os nomes com aspas. Este comportamento comprometerá a correspondência com os dados do dicionário de dados recebidos do Snowflake; portanto, a menos que nomes de identificador tenham sido verdadeiramente criados para diferenciar maiúsculas/minúsculas (usando aspas, como em "TestDb"
), nomes em minúsculas devem ser usados no lado do SQLAlchemy.
Suporte para índices¶
O Snowflake não utiliza índices, nem o Snowflake SQLAlchemy.
Suporte para tipos de dados Numpy¶
O Snowflake SQLAlchemy suporta a vinculação e busca de tipos de dados NumPy
. A vinculação sempre tem suporte. Para permitir a busca de tipos de dados NumPy
, adicione numpy=True
aos parâmetros de conexão.
Os seguintes tipos de dados NumPy
têm suporte:
numpy.int64
numpy.float64
numpy.datetime64
O exemplo a seguir mostra a ida e volta de dados numpy.datetime64
:
import numpy as np import pandas as pd engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', numpy=True, )) specific_date = np.datetime64('2016-03-04T12:03:05.123456789Z') connection = engine.connect() connection.execute( "CREATE OR REPLACE TABLE ts_tbl(c1 TIMESTAMP_NTZ)") connection.execute( "INSERT INTO ts_tbl(c1) values(%s)", (specific_date,) ) df = pd.read_sql_query("SELECT * FROM ts_tbl", engine) assert df.c1.values[0] == specific_date
Metadados de coluna do cache¶
O SQLAlchemy fornece a API de inspeção do tempo de execução para obter as informações de tempo de execução sobre os vários objetos. Um dos casos de uso comuns é obter todas as tabelas e seus metadados de coluna em um esquema a fim de construir um catálogo de esquemas. Por exemplo, o Alembic sobre o SQLAlchemy gerencia as migrações de esquemas de banco de dados. Um fluxo de pseudocódigo pode ser o seguinte:
inspector = inspect(engine) schema = inspector.default_schema_name for table_name in inspector.get_table_names(schema): column_metadata = inspector.get_columns(table_name, schema) primary_keys = inspector.get_primary_keys(table_name, schema) foreign_keys = inspector.get_foreign_keys(table_name, schema) ...
Neste fluxo, um problema potencial é que ele pode levar bastante tempo, já que as consultas são feitas em cada tabela. Os resultados são armazenados em cache, mas a obtenção dos metadados de coluna é cara.
Para mitigar o problema, o Snowflake SQLAlchemy usa um sinalizador cache_column_metadata=True
de tal forma que todos os metadados das colunas de todas as tabelas são armazenados em cache quando get_table_names
é chamado, e o resto de get_columns
, get_primary_keys
e get_foreign_keys
pode tirar proveito do cache.
engine = create_engine(URL( account = 'myorganization-myaccount', user = 'testuser1', password = 'pass', database = 'db', schema = 'public', warehouse = 'testwh', role='myrole', cache_column_metadata=True, ))
Nota
O uso de memória subirá mais, pois todos os metadados das colunas associados ao objeto Inspector
são armazenados em cache. Use o sinalizador somente se você precisar obter todos os metadados das colunas.
Suporte para VARIANT, ARRAY e OBJECT¶
O Snowflake SQLAlchemy suporta a busca dos tipos de dados VARIANT
, ARRAY
e OBJECT
. Todos os tipos são convertidos em str
no Python para que você possa convertê-los em tipos de dados nativos usando json.loads
.
Este exemplo mostra como criar uma tabela incluindo colunas de tipos de dados VARIANT
, ARRAY
e OBJECT
:
from snowflake.sqlalchemy import (VARIANT, ARRAY, OBJECT) ... t = Table('my_semi_structured_datatype_table', metadata, Column('va', VARIANT), Column('ob', OBJECT), Column('ar', ARRAY)) metdata.create_all(engine)
Para recuperar as colunas de tipos de dados VARIANT
, ARRAY
e OBJECT
e convertê-los para os tipos de dados Python nativos, busque dados e chame o método json.loads
como segue:
import json connection = engine.connect() results = connection.execute(select([t])) row = results.fetchone() data_variant = json.loads(row[0]) data_object = json.loads(row[1]) data_array = json.loads(row[2])
Suporte para CLUSTER BY¶
O Snowflake SQLAlchemy suporta o parâmetro CLUSTER BY
para tabelas. Para obter mais informações sobre o parâmetro, consulte CREATE TABLE.
Este exemplo mostra como criar uma tabela com duas colunas, id
e name
, como a chave de clustering:
t = Table('myuser', metadata, Column('id', Integer, primary_key=True), Column('name', String), snowflake_clusterby=['id', 'name'], ... ) metadata.create_all(engine)
Suporte para Alembic¶
O Alembic é uma ferramenta de migração de banco de dados para uso no SQLAlchemy
. O Snowflake SQLAlchemy funciona adicionando o seguinte código ao alembic/env.py
para que o Alembic possa reconhecer o Snowflake SQLAlchemy.
from alembic.ddl.impl import DefaultImpl class SnowflakeImpl(DefaultImpl): __dialect__ = 'snowflake'
Consulte a documentação do Alembic para uso geral.
Suporte à autenticação de par de chaves¶
Snowflake SQLAlchemy oferece suporte à autenticação de par de chaves alavancando a funcionalidade do conector do Snowflake para Python. Consulte Uso de autenticação de pares de chaves e rotação de pares de chaves para obter os passos para a criação das chaves privadas e públicas.
O parâmetro de chave privada é passado por connect_args
da seguinte forma:
... from snowflake.sqlalchemy import URL from sqlalchemy import create_engine from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import dsa from cryptography.hazmat.primitives import serialization with open("rsa_key.p8", "rb") as key: p_key= serialization.load_pem_private_key( key.read(), password=os.environ['PRIVATE_KEY_PASSPHRASE'].encode(), backend=default_backend() ) pkb = p_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption()) engine = create_engine(URL( account='abc123', user='testuser1', ), connect_args={ 'private_key': pkb, }, )
Onde PRIVATE_KEY_PASSPHRASE
é uma frase secreta para descriptografar o arquivo de chave privada, rsa_key.p8
.
O método snowflake.sqlalchemy.URL
não oferece suporte a parâmetros de chave privada.
Suporte ao comando de mescla¶
Snowflake SQLAlchemy oferece suporte à execução de um upsert com sua expressão personalizada MergeInto
. Consulte MERGE para obter a documentação completa.
Use-o da seguinte forma:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import MergeInto engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) t1 = meta.tables['t1'] t2 = meta.tables['t2'] merge = MergeInto(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key) merge.when_matched_then_delete().where(t2.c.marked == 1) merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus) merge.when_matched_then_update().values(val=t2.c.newval) merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus) connection.execute(merge)
Suporte para CopyIntoStorage¶
Snowflake SQLAlchemy oferece suporte a salvar tabelas e consultar resultados em diferentes estágios do Snowflake, Azure Containers e buckets AWS com sua expressão personalizada CopyIntoStorage
. Consulte COPY INTO <local> para obter a documentação completa.
Use-o da seguinte forma:
from sqlalchemy.orm import sessionmaker from sqlalchemy import MetaData, create_engine from snowflake.sqlalchemy import CopyIntoStorage, AWSBucket, CSVFormatter engine = create_engine(db.url, echo=False) session = sessionmaker(bind=engine)() connection = engine.connect() meta = MetaData() meta.reflect(bind=session.bind) users = meta.tables['users'] copy_into = CopyIntoStorage(from_=users, into=AWSBucket.from_uri('s3://my_private_backup').encryption_aws_sse_kms('1234abcd-12ab-34cd-56ef-1234567890ab'), formatter=CSVFormatter().null_if(['null', 'Null'])) connection.execute(copy_into)