Uso do conector Python

Este tópico fornece uma série de exemplos que ilustram como usar o conector Snowflake para realizar operações padrão do Snowflake, tais como login de usuário, criação de bancos de dados e tabelas, criação de warehouse, inserção/carregamento de dados e consulta.

O código de exemplo no final deste tópico combina os exemplos em um único programa Python operacional.

Nota

Snowflake agora fornece APIs Python de primeira classe para gerenciar os principais recursos do Snowflake, incluindo bancos de dados, esquemas, tabelas, tarefas e warehouses, sem usar SQL. Para obter mais informações, consulte Snowflake Python API: gerenciamento de objetos Snowflake com Python.

Neste tópico:

Criação de um banco de dados, esquema e warehouse

Depois de fazer o login, crie um banco de dados, esquema e warehouse, se ainda não existirem, usando os comandos CREATE DATABASE, CREATE SCHEMA e CREATE WAREHOUSE.

O exemplo abaixo mostra como criar um warehouse chamado tiny_warehouse, um banco de dados chamado testdb e um esquema chamado testschema. Note que, quando você cria o esquema, deve especificar o nome do banco de dados no qual criar o esquema, ou você já deve estar conectado ao banco de dados no qual deve criar o esquema. O exemplo abaixo executa um comando USE DATABASE antes do comando CREATE SCHEMA para garantir que o esquema seja criado no banco de dados correto.

conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
Copy

Uso do banco de dados, esquema e warehouse

Especifique o banco de dados e o esquema no qual você deseja criar tabelas. Especifique também o warehouse que fornecerá recursos para a execução de consultas e instruções DML.

Por exemplo, para utilizar o banco de dados testdb, esquema testschema e warehouse tiny_warehouse (criados anteriormente):

conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
conn.cursor().execute("USE DATABASE testdb_mg")
conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
Copy

Criação de tabelas e inserção de dados

Use o comando CREATE TABLE para criar tabelas e o comando INSERT para preencher as tabelas com dados.

Por exemplo, crie uma tabela chamada testtable e insira duas linhas na tabela:

conn.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "test_table(col1 integer, col2 string)")

conn.cursor().execute(
    "INSERT INTO test_table(col1, col2) VALUES " + 
    "    (123, 'test string1'), " + 
    "    (456, 'test string2')")
Copy

Carregamento de dados

Em vez de inserir dados em tabelas usando comandos INSERT individuais, você pode carregar dados em massa de arquivos preparados em um local interno ou externo.

Cópia de dados a partir de um local interno

Para carregar dados de arquivos de sua máquina host em uma tabela, primeiro use o comando PUT para preparar o arquivo em um local interno e depois use o comando COPY INTO <tabela> para copiar os dados dos arquivos para a tabela.

Por exemplo:

# Putting Data
con.cursor().execute("PUT file:///tmp/data/file* @%testtable")
con.cursor().execute("COPY INTO testtable")
Copy

Onde seus dados CSV estão armazenados em um diretório local chamado /tmp/data em um ambiente Linux ou macOS e o diretório contém arquivos chamados file0, file1, … file100.

Cópia de dados de um local externo

Para carregar dados de arquivos já preparados em um local externo (ou seja, seu bucket S3) em uma tabela, use o comando COPY INTO <tabela>.

Por exemplo:

# Copying Data
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
    STORAGE_INTEGRATION = myint
    FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))
Copy

Onde:

  • s3://<bucket_s3>/data/ especifica o nome do seu bucket S3

  • Os arquivos no bucket são prefixados com data.

  • O bucket é acessado usando-se uma integração de armazenamento criada com CREATE STORAGE INTEGRATION por um administrador de conta (ou seja, um usuário com a função ACCOUNTADMIN) ou uma função com o privilégio global CREATE INTEGRATION. Uma integração de armazenamento permite que os usuários evitem fornecer credenciais para acessar um local de armazenamento privado.

Nota

Este exemplo usa a função format() para compor a instrução. Se seu ambiente tem um risco de ataques de injeção SQL, você pode preferir vincular valores em vez de usar format().

Consulta de dados

Com o conector Snowflake para Python, você pode enviar:

  • uma consulta síncrona, que retorna o controle ao seu aplicativo após a conclusão da consulta.

  • uma consulta assíncrona, que retorna o controle ao seu aplicativo antes que a consulta seja concluída.

Após a conclusão da consulta, você usa o objeto Cursor para buscar os valores nos resultados. Por padrão, o conector Snowflake para Python converte os valores dos tipos de dados do Snowflake em tipos de dados Python nativos. (Note que você pode escolher retornar os valores como cadeias de caracteres cordas e realizar as conversões de tipo em seu aplicativo. Consulte Aprimoramento do desempenho da consulta sem conversão de dados.)

Nota

Por padrão, os valores das colunas NUMBER são retornados como valores de ponto flutuante de precisão dupla (float64). Para retorná-los como valores decimais (decimal.Decimal) nos métodos fetch_pandas_all() e fetch_pandas_batches(), defina o parâmetro arrow_number_to_decimal no método connect() como True.

Realização de uma consulta síncrona

Para realizar uma consulta síncrona, chame o método execute() no objeto Cursor. Por exemplo:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
cur.execute('select * from products')
Copy

Use o objeto Cursor para buscar os valores nos resultados, como explicado em Uso de cursor para buscar valores.

Realização de uma consulta assíncrona

O conector Snowflake para Python aceita consultas assíncronas (ou seja, consultas que retornam o controle ao usuário antes que sejam concluídas). Você pode enviar uma consulta assíncrona e usar sondagens para determinar quando a consulta foi concluída. Após a conclusão da consulta, você pode obter os resultados.

Nota

Para realizar consultas assíncronas, é necessário garantir que o parâmetro de configuração ABORT_DETACHED_QUERY seja FALSE (valor padrão).

O Snowflake fecha automaticamente as conexões após um período de tempo (padrão: 5 minutos), o que ///orphans//isola/// qualquer consulta ativa. Se o valor for TRUE, o Snowflake encerra essas consultas órfãs, o que pode impactar as consultas assíncronas.

Com esse recurso, você pode enviar várias consultas em paralelo sem esperar que cada consulta seja concluída. Você também pode executar uma combinação de consultas síncronas e assíncronas durante a mesma sessão.

Finalmente, você pode enviar uma consulta assíncrona a partir de uma conexão e verificar os resultados de uma conexão diferente. Por exemplo, um usuário pode iniciar uma consulta de longa duração a partir de seu aplicativo, sair do aplicativo e reiniciá-lo posteriormente para verificar os resultados.

Envio de uma consulta assíncrona

Para enviar uma consulta assíncrona, chame o método execute_async() no objeto Cursor. Por exemplo:

conn = snowflake.connector.connect( ... )
cur = conn.cursor()
# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
Copy

Após enviar a consulta:

Para exemplos de realização de consultas assíncronas, consulte Exemplos de consultas assíncronas.

Práticas recomendadas para consultas assíncronas

Ao enviar uma consulta assíncrona, siga estas práticas recomendadas:

  • Certifique-se de que você saiba quais consultas dependem de outras consultas antes de executar qualquer consulta em paralelo. Algumas consultas são interdependentes e sensíveis à ordem e, portanto, não são adequadas para paralelização. Por exemplo, obviamente uma instrução INSERT não deve começar até que a instrução CREATE TABLE correspondente tenha terminado.

  • Assegure-se de não fazer muitas consultas para a memória que você tem disponível. Executar várias consultas em paralelo normalmente consome mais memória, especialmente se mais de um conjunto de resultados for armazenado na memória ao mesmo tempo.

  • Ao realizar uma sondagem, trate dos raros casos em que uma consulta não tenha êxito.

  • Certifique-se de que as instruções de controle de transações (BEGIN, COMMIT e ROLLBACK) não sejam executadas em paralelo com outras instruções.

Recuperação da ID de consulta do Snowflake

Uma ID de consulta identifica cada consulta executada pelo Snowflake. Quando você usa o conector Snowflake para Python para executar uma consulta, pode acessar a ID da consulta através do atributo sfqid no objeto Cursor:

# Retrieving a Snowflake Query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)
Copy

Você pode usar a ID da consulta para:

Verificação do status de uma consulta

Para verificar o status de uma consulta:

  1. Obtenha a ID da consulta no campo sfqid do objeto Cursor.

  2. Passe a ID da consulta ao método get_query_status() do objeto Connection para retornar a constante QueryStatus enum que representa o status da consulta.

    Por padrão, get_query_status() não indica um erro se a consulta resultar em um erro. Se quiser indicar um erro, chame get_query_status_throw_if_error() em vez disso.

  3. Use a constante da enumeração QueryStatus para verificar o status da consulta.

    • Para determinar se a consulta ainda está em execução (por exemplo, se for uma consulta assíncrona), passe a constante para o método is_still_running() do objeto Connection.

    • Para determinar se ocorreu um erro, passe a constante para o método is_an_error().

    Para a lista completa de constantes de enumeração, consulte QueryStatus.

O exemplo a seguir executa uma consulta assíncrona e verifica o status da consulta:

import time
...
# Execute a long-running query asynchronously.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')
...
# Wait for the query to finish running.
query_id = cur.sfqid
while conn.is_still_running(conn.get_query_status(query_id)):
  time.sleep(1)
Copy

O exemplo a seguir indica um erro se a consulta tiver resultado em um erro:

from snowflake.connector import ProgrammingError
import time
...
# Wait for the query to finish running and raise an error
# if a problem occurred with the execution of the query.
try:
  query_id = cur.sfqid
  while conn.is_still_running(conn.get_query_status_throw_if_error(query_id)):
    time.sleep(1)
except ProgrammingError as err:
  print('Programming Error: {0}'.format(err))
Copy

Uso do ID de consulta para recuperar os resultados de uma consulta

Nota

Se você executou uma consulta síncrona chamando o método execute() para um objeto Cursor, não precisa usar a ID da consulta para recuperar os resultados. Você pode simplesmente buscar os valores nos resultados, como explicado em Uso de cursor para buscar valores.

Se quiser recuperar os resultados de uma consulta assíncrona ou de uma consulta síncrona previamente enviada, siga estes passos:

  1. Obtenha a ID da consulta. Consulte Recuperação da ID de consulta do Snowflake.

  2. Chame o método get_results_from_sfqid() no objeto Cursor para recuperar os resultados.

  3. Use o objeto Cursor para buscar os valores nos resultados, como explicado em Uso de cursor para buscar valores.

Observe que se a consulta ainda estiver em execução, os métodos de busca (fetchone(), fetchmany(), fetchall(), etc.) aguardarão que a consulta seja concluída.

Por exemplo:

# Get the results from a query.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

Uso de cursor para buscar valores

Busque valores de uma tabela usando o método do iterador de objetos do cursor.

Por exemplo, para buscar colunas chamadas “col1” e “col2” da tabela chamada testtable, que foi criada anteriormente (em Criação de tabelas e inserção de dados), use um código semelhante ao seguinte:

cur = conn.cursor()
try:
    cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()
Copy

O conector Snowflake para Python também fornece um atalho conveniente:

for (col1, col2) in con.cursor().execute("SELECT col1, col2 FROM testtable"):
    print('{0}, {1}'.format(col1, col2))
Copy

Se precisar obter um único resultado (ou seja, uma única linha), use o método fetchone:

col1, col2 = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchone()
print('{0}, {1}'.format(col1, col2))
Copy

Se precisar obter o número de linhas especificado de cada vez, use o método fetchmany com o número de linhas:

cur = con.cursor().execute("SELECT col1, col2 FROM testtable")
ret = cur.fetchmany(3)
print(ret)
while len(ret) > 0:
    ret = cur.fetchmany(3)
    print(ret)
Copy

Nota

Use fetchone ou fetchmany se o conjunto de resultados for grande demais para caber na memória.

Se precisar obter todos os resultados de uma só vez:

results = con.cursor().execute("SELECT col1, col2 FROM testtable").fetchall()
for rec in results:
    print('%s, %s' % (rec[0], rec[1]))
Copy

Para definir um tempo limite para uma consulta, execute um comando “begin” e inclua um parâmetro de tempo limite na consulta. Se a consulta exceder a duração do valor do parâmetro, um erro é produzido e ocorre uma reversão.

No código a seguir, o erro 604 significa que a consulta foi cancelada. O parâmetro de tempo limite inicia Timer() e o cancela se a consulta não for concluída dentro do tempo especificado.

conn.cursor().execute("create or replace table testtbl(a int, b string)")

conn.cursor().execute("begin")
try:
   conn.cursor().execute("insert into testtbl(a,b) values(3, 'test3'), (4,'test4')", timeout=10) # long query

except ProgrammingError as e:
   if e.errno == 604:
      print("timeout")
      conn.cursor().execute("rollback")
   else:
      raise e
else:
   conn.cursor().execute("commit")
Copy

Uso de DictCursor para buscar valores por nome de coluna

Se quiser buscar um valor por nome de coluna, crie um objeto cursor do tipo DictCursor.

Por exemplo:

# Querying data by DictCursor
from snowflake.connector import DictCursor
cur = con.cursor(DictCursor)
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for rec in cur:
        print('{0}, {1}'.format(rec['COL1'], rec['COL2']))
finally:
    cur.close()
Copy

Exemplos de consultas assíncronas

A seguir está um exemplo simples de uma consulta assíncrona:

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Retrieve the results.
cur.get_results_from_sfqid(query_id)
results = cur.fetchall()
print(f'{results[0]}')
Copy

O próximo exemplo envia uma consulta assíncrona a partir de uma conexão e recupera os resultados de uma conexão diferente:

from snowflake.connector import ProgrammingError
import time

conn = snowflake.connector.connect( ... )
cur = conn.cursor()

# Submit an asynchronous query for execution.
cur.execute_async('select count(*) from table(generator(timeLimit => 25))')

# Get the query ID for the asynchronous query.
query_id = cur.sfqid

# Close the cursor and the connection.
cur.close()
conn.close()

# Open a new connection.
new_conn = snowflake.connector.connect( ... )

# Create a new cursor.
new_cur = new_conn.cursor()

# Retrieve the results.
new_cur.get_results_from_sfqid(query_id)
results = new_cur.fetchall()
print(f'{results[0]}')
Copy

Cancelamento usando a ID da consulta

Cancele uma consulta usando a ID da consulta:

cur = cn.cursor()

try:
  cur.execute(r"SELECT SYSTEM$CANCEL_QUERY('queryID')")
  result = cur.fetchall()
  print(len(result))
  print(result[0])
finally:
  cur.close()
Copy

Substitua a cadeia “queryID“ pela ID real da consulta. Para obter a ID de uma consulta, consulte Recuperação da ID de consulta do Snowflake.

Aprimoramento do desempenho da consulta sem conversão de dados

Para melhorar o desempenho da consulta, use a classe SnowflakeNoConverterToPython no módulo snowflake.connector.converter_null para ignorar as conversões de dados do tipo de dados interno do Snowflake para o tipo de dados nativo do Python, por exemplo:

from snowflake.connector.converter_null import SnowflakeNoConverterToPython

con = snowflake.connector.connect(
    ...
    converter_class=SnowflakeNoConverterToPython
)
for rec in con.cursor().execute("SELECT * FROM large_table"):
    # rec includes raw Snowflake data
Copy

Como resultado, todos os dados são representados em forma de cadeia de caracteres, de modo que o aplicativo é responsável pela sua conversão nos tipos de dados Python nativos. Por exemplo, os dados de TIMESTAMP_NTZ e TIMESTAMP_LTZ são a hora de época representada em forma de cadeia de caracteres, e os dados de TIMESTAMP_TZ são a hora de época seguida por um espaço seguido pela diferença para UTC em minutos representada em forma de cadeia de caracteres.

Nenhum impacto ocorre nos dados vinculados; os dados nativos Python ainda podem ser vinculados para atualizações.

Vinculação de dados

Para especificar valores a serem usados em uma instrução SQL, você pode incluir literais na instrução ou vincular variáveis. Quando você vincula variáveis, insere um ou mais espaços reservados no texto da instrução SQL e então especifica a variável (o valor a ser usado) para cada espaço reservado.

O exemplo a seguir contrasta o uso de literais e a vinculação:

Literais:

con.cursor().execute("INSERT INTO testtable(col1, col2) VALUES(789, 'test string3')")
Copy

Vinculação:

con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%s, %s)", (
        789,
        'test string3'
    ))
Copy

Nota

Há um limite máximo para o tamanho dos dados que você pode vincular ou que pode combinar em um lote. Para obter mais detalhes, consulte Limites no tamanho do texto de consulta.

O Snowflake aceita os seguintes tipos de vinculação:

Cada uma é explicada abaixo.

Vinculação de pyformat ou format

Tanto a vinculação pyformat como format vincula dados no lado do cliente, e não no lado do servidor.

Por padrão, o conector Snowflake para Python aceita tanto pyformat como format, de modo que você pode usar %(name)s ou %s como o espaço reservado. Por exemplo:

  • Uso de %(name)s como o espaço reservado:

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) "
        "VALUES(%(col1)s, %(col2)s)", {
            'col1': 789,
            'col2': 'test string3',
            })
    
    Copy
  • Uso de %s como o espaço reservado:

    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(%s, %s)", (
            789,
            'test string3'
        ))
    
    Copy

Com pyformat e format, você também pode usar um objeto de lista para vincular dados para o operador IN:

# Binding data for IN operator
con.cursor().execute(
    "SELECT col1, col2 FROM testtable"
    " WHERE col2 IN (%s)", (
        ['test string1', 'test string3'],
    ))
Copy

O caractere de porcentagem (“%”) é tanto um caractere curinga para SQL LIKE como um caractere de vinculação de formato para Python. Se você usar a vinculação de formato e se seu comando SQL contiver o caractere de porcentagem, poderá ser necessário usar um caractere de escape com o caractere de porcentagem. Por exemplo, se sua instrução SQL for:

SELECT col1, col2
    FROM test_table
    WHERE col2 ILIKE '%York' LIMIT 1;  -- Find York, New York, etc.
Copy

então seu código Python será parecido com o seguinte (note o sinal de porcentagem extra para o escape do sinal de porcentagem original):

sql_command = "select col1, col2 from test_table "
sql_command += " where col2 like '%%York' limit %(lim)s"
parameter_dictionary = {'lim': 1 }
cur.execute(sql_command, parameter_dictionary)
Copy

Vinculação de qmark ou numeric

Tanto a vinculação qmark como numeric vincula dados no lado do servidor, e não no lado do cliente.

  • Para a vinculação qmark, use um caractere de interrogação (?) para indicar em que parte da cadeia de caracteres você quer que o valor de uma variável seja inserido.

  • Para a vinculação numeric, use dois pontos (:) seguido por um número para indicar a posição da variável que você deseja substituir naquela posição. Por exemplo, :2 especifica a segunda variável.

    Use a vinculação numérica para vincular o mesmo valor mais de uma vez na mesma consulta. Por exemplo, se tiver um valor longo VARCHAR ou BINARY ou semiestruturado que você quer usar mais de uma vez, então a vinculação numeric permite que você envie o valor para o servidor uma vez e o use várias vezes.

As seções a seguir explicam como usar a vinculação qmark e numeric:

Uso da vinculação qmark ou numeric

Para usar a vinculação de estilos qmark ou numeric, execute uma das seguintes opções:

  • snowflake.connector.paramstyle='qmark'

  • snowflake.connector.paramstyle='numeric'

Importante

Você precisa definir o atributo paramstyle antes de chamar o método connect().

Se você definir paramstyle como qmark ou numeric, precisará usar ? ou :N (onde N é substituído por um número) como os espaços reservados, respectivamente.

Por exemplo:

  • Uso de ? como o espaço reservado:

    import snowflake.connector
    
    snowflake.connector.paramstyle='qmark'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(?, ?)", (
            789,
            'test string3'
        ))
    
    Copy
  • Uso de :N como o espaço reservado:

    import snowflake.connector
    
    snowflake.connector.paramstyle='numeric'
    
    con = snowflake.connector.connect(...)
    
    con.cursor().execute(
        "INSERT INTO testtable(col1, col2) "
        "VALUES(:1, :2)", (
            789,
            'test string3'
        ))
    
    Copy

    A consulta a seguir mostra como usar a vinculação numeric para reutilizar uma variável:

    con.cursor().execute(
        "INSERT INTO testtable(complete_video, short_sample_of_video) "
        "VALUES(:1, SUBSTRING(:1, :2, :3))", (
            binary_value_that_stores_video,          # variable :1
            starting_offset_in_bytes_of_video_clip,  # variable :2
            length_in_bytes_of_video_clip            # variable :3
        ))
    
    Copy

Uso da vinculação qmark ou numeric com objetos datetime

Ao usar a vinculação qmark ou numeric para vincular dados a um tipo de dados TIMESTAMP do Snowflake, defina a variável de vinculação como uma tupla que especifica o tipo de dados de carimbo de data/hora do Snowflake (TIMESTAMP_LTZ ou TIMESTAMP_TZ) e o valor. Por exemplo:

import snowflake.connector

snowflake.connector.paramstyle='qmark'

con = snowflake.connector.connect(...)

con.cursor().execute(
    "CREATE OR REPLACE TABLE testtable2 ("
    "   col1 int, "
    "   col2 string, "
    "   col3 timestamp_ltz"
    ")"
)

con.cursor().execute(
    "INSERT INTO testtable2(col1,col2,col3) "
    "VALUES(?,?,?)", (
        987,
        'test string4',
        ("TIMESTAMP_LTZ", datetime.now())
    )
 )
Copy

Ao contrário da vinculação do lado do cliente, a vinculação do lado do servidor requer o tipo de dados Snowflake para a coluna. A maioria dos tipos de dados Python mais comuns já tem mapeamentos implícitos para os tipos de dados Snowflake (por exemplo, int é mapeado para FIXED). Entretanto, como os dados Python datetime podem ser vinculados a um de vários tipos de dados Snowflake (TIMESTAMP_NTZ, TIMESTAMP_LTZ ou TIMESTAMP_TZ), e o mapeamento padrão é TIMESTAMP_NTZ, deve-se especificar o tipo de dados Snowflake a ser utilizado.

Uso de variáveis de vinculação com o operador IN

qmark e numeric (vinculação do lado do servidor) não aceitam o uso de variáveis de vinculação com o operador IN.

Se precisar usar variáveis de vinculação com o operador IN, use a vinculação do lado do cliente (pyformat ou format).

Vinculação de parâmetros com variáveis para inserções em lote

No código de seu aplicativo, você pode inserir várias linhas como um único lote. Para isso, utilizar parâmetros para valores em uma instrução INSERT. Por exemplo, a seguinte instrução usa os espaços reservados de lugar para qmark vinculação em uma instrução INSERT:

insert into grocery (item, quantity) values (?, ?)
Copy

Então, para especificar os dados que devem ser inseridos, defina uma variável que seja uma sequência de sequências (por exemplo, uma lista de tuplas):

rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
Copy

Como mostrado no exemplo acima, cada item da lista é uma tupla que contém os valores da coluna para uma linha a ser inserida.

Para realizar a vinculação, chame o método executemany(), passando a variável como o segundo argumento. Por exemplo:

conn = snowflake.connector.connect( ... )
rows_to_insert = [('milk', 2), ('apple', 3), ('egg', 2)]
conn.cursor().executemany(
    "insert into grocery (item, quantity) values (?, ?)",
    rows_to_insert)
Copy

Se você estiver vinculando dados no servidor (ou seja, usando a vinculação qmark ou numeric), o conector pode otimizar o desempenho das inserções em lote através da vinculação.

Quando você usa esta técnica para inserir um grande número de valores, o driver pode melhorar o desempenho ao transmitir os dados (sem criar arquivos na máquina local) para um estágio temporário de ingestão. O driver faz isso automaticamente quando o número de valores excede um limite.

Além disso, o banco de dados e o esquema atual da sessão devem ser definidos. Se não forem definidos, o comando CREATE TEMPORARY STAGE executado pelo driver pode falhar com o seguinte erro:

CREATE TEMPORARY STAGE SYSTEM$BIND file_format=(type=csv field_optionally_enclosed_by='"')
Cannot perform CREATE STAGE. This session does not have a current schema. Call 'USE SCHEMA', or use a qualified name.
Copy

Nota

Para formas alternativas de carregar dados no banco de dados Snowflake (incluindo carregamento em massa usando o comando COPY), consulte Carregamento de dados para o Snowflake.

Evite ataques de injeção SQL

Evite vincular dados usando a função de formatação Python porque você corre o risco de sofrer injeção SQL. Por exemplo:

# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)d, '%(col2)s')" % {
        'col1': 789,
        'col2': 'test string3'
    })
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%d, '%s')" % (
        789,
        'test string3'
    ))
Copy
# Binding data (UNSAFE EXAMPLE)
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES({col1}, '{col2}')".format(
        col1=789,
        col2='test string3')
    )
Copy

Em vez disso, armazene os valores em variáveis e depois vincule essas variáveis usando o estilo de vinculação qmark ou numeric.

Recuperação de metadados de coluna

Para recuperar metadados sobre cada coluna do conjunto de resultados (por exemplo, o nome, tipo, precisão, escala, etc. de cada coluna), use uma das seguintes abordagens:

  • Para acessar os metadados após chamar o método execute() para executar a consulta, use o atributo description do objeto Cursor.

  • Para acessar os metadados sem ter que executar a consulta, chame o método describe().

    O método describe está disponível no conector do Snowflake para Python 2.4.6 e em versões mais recentes.

O atributo description é definido com um dos seguintes valores:

  • Versão 2.4.5 e anteriores: Uma lista de tuplas.

  • Versão 2.4.6 e posteriores: Uma lista de objetos ResultMetadata. (O método describe também retorna esta lista).

Cada tupla e objeto ResultMetadata contém os metadados de uma coluna (o nome da coluna, tipo de dados, etc.). Você pode acessar os metadados por índice ou, nas versões 2.4.6 e posteriores, por atributo ResultMetadata.

Os exemplos a seguir descrevem como acessar os metadados de tuplas e objetos ResultMetadata retornados.

Exemplo: Obter os metadados de nome da coluna por índice (versões 2.4.5 e anteriores):

O exemplo a seguir utiliza o atributo description para recuperar a lista de nomes de colunas após a execução de uma consulta. O atributo é uma lista de tuplas, e o exemplo acessa o nome da coluna a partir do primeiro valor em cada tupla.

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col[0] for col in cur.description]))
Copy

Exemplo: Obter os metadados de nome da coluna por atributo (versões 2.4.6 e posteriores):

O exemplo a seguir utiliza o atributo description para recuperar a lista de nomes de colunas após a execução de uma consulta. O atributo é uma lista de objetos ResultMetaData, e o exemplo acessa o nome da coluna a partir do atributo name de cada objeto ResultMetadata.

cur = conn.cursor()
cur.execute("SELECT * FROM test_table")
print(','.join([col.name for col in cur.description]))
Copy

Exemplo: Obter os metadados de nome da coluna sem executar a consulta (versões 2.4.6 e posteriores):

O exemplo a seguir usa o método describe para recuperar a lista de nomes de colunas sem executar uma consulta. O método describe() retorna uma lista de objetos ResultMetaData, e o exemplo acessa o nome da coluna a partir do atributo name de cada objeto ResultMetadata.

cur = conn.cursor()
result_metadata_list = cur.describe("SELECT * FROM test_table")
print(','.join([col.name for col in result_metadata_list]))
Copy

Tratamento de erros

O aplicativo precisa tratar as exceções levantadas pelo conector do Snowflake adequadamente e decidir continuar ou parar de executar o código.

# Catching the syntax error
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # customer error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()
Copy

Uso de execute_stream para executar scripts SQL

A função execute_stream permite a execução de um ou mais scripts SQL em um fluxo:

from codecs import open
with open(sqlfile, 'r', encoding='utf-8') as f:
    for cur in con.execute_stream(f):
        for ret in cur:
            print(ret)
Copy

Encerramento da conexão

Como uma melhor prática, encerre a conexão chamando o método close:

connection.close()
Copy

Isso assegura que as métricas coletadas do cliente sejam enviadas ao servidor e a sessão seja excluída. Além disso, blocos try-finally ajudam a garantir que a conexão seja encerrada mesmo que uma exceção seja levantada:

# Connecting to Snowflake
con = snowflake.connector.connect(...)
try:
    # Running queries
    con.cursor().execute(...)
    ...
finally:
    # Closing the connection
    con.close()
Copy

Uso do gerenciador de contexto para conectar e controlar transações

O conector do Snowflake para Python é compatível com um gerenciador de contexto que aloca e libera recursos conforme necessário. O gerenciador de contexto é útil para confirmar ou reverter transações com base no status da instrução quando autocommit está desabilitado.

# Connecting to Snowflake using the context manager
with snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False,
) as con:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
Copy

No exemplo acima, quando a terceira instrução falha, o gerenciador de contexto reverte as mudanças na transação e encerra a conexão. Se todas as instruções forem bem-sucedidas, o gerenciador de contexto confirmará as mudanças e encerrará a conexão.

Um código equivalente com blocos try e except é o seguinte:

# Connecting to Snowflake using try and except blocks
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  autocommit=False)
try:
    con.cursor().execute("INSERT INTO a VALUES(1, 'test1')")
    con.cursor().execute("INSERT INTO a VALUES(2, 'test2')")
    con.cursor().execute("INSERT INTO a VALUES(not numeric value, 'test3')") # fail
    con.commit()
except Exception as e:
    con.rollback()
    raise e
finally:
    con.close()
Copy

Registro

O conector do Snowflake para Python utiliza o módulo Python padrão logging para registrar o status em intervalos regulares para que o aplicativo possa rastrear sua atividade em segundo plano. A maneira mais simples de habilitar o registro é chamar logging.basicConfig() no início do aplicativo.

Por exemplo, para definir o nível de registro como INFO e armazenar os registros em um arquivo chamado /tmp/snowflake_python_connector.log:

logging.basicConfig(
    filename=file_name,
    level=logging.INFO)
Copy

Um registro mais abrangente pode ser habilitado definindo o nível de registro como DEBUG, como segue:

# Logging including the timestamp, thread and the source code location
import logging
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

A classe formatadora opcional mas recomendada SecretDetector garante que um certo conjunto de informações confidenciais conhecidas seja mascarado antes de ser gravado nos arquivos de log do conector do Snowflake para Python. Para usar SecretDetector, use um código semelhante ao seguinte:

# Logging including the timestamp, thread and the source code location
import logging
from snowflake.connector.secret_detector import SecretDetector
for logger_name in ['snowflake.connector', 'botocore', 'boto3']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('/tmp/python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(SecretDetector('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
Copy

Nota

botocore e boto3 estão disponíveis através do AWS (Amazon Web Services) SDK para Python.

Exemplo de programa

O seguinte código de exemplo combina muitos dos exemplos descritos nas seções anteriores em um programa Python operacional. Este exemplo contém duas partes:

  • Uma classe pai (“python_veritas_base”) contém o código para muitas operações comuns, tais como a conexão ao servidor.

  • Uma classe filha (“python_connector_example”) representa as porções personalizadas de um determinado cliente; por exemplo, consultar uma tabela.

Este código de exemplo é importado diretamente de um de nossos testes para ajudar a garantir que tenha sido executado em um build recente do produto.

Como é retirado de um teste, o exemplo inclui uma pequena quantidade de código para definir uma porta e um protocolo alternativos utilizados em alguns testes. Os usuários não podem definir o protocolo ou número da porta; em vez disso, devem omiti-los e usar os padrões.

Ele também contém alguns marcadores de seção (às vezes chamados de “snippet tags”) para identificar o código que pode ser importado independentemente para a documentação. Os marcadores de seção normalmente se assemelham a:

# -- (> ---------------------- SECTION=import_connector ---------------------
...
# -- <) ---------------------------- END_SECTION ----------------------------
Copy

Estes marcadores de seção não são necessários em um código de usuário.

A primeira parte do código de exemplo contém as sub-rotinas comuns para:

  • Ler argumentos de linha de comando (por exemplo, “–warehouse MyWarehouse“) que contêm informações de conexão.

  • Conectar-se ao servidor.

  • Criar e utilizar um warehouse, um banco de dados e um esquema.

  • Descartar o esquema, banco de dados e warehouse ao terminar de usá-los.


import logging
import os
import sys


# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


class python_veritas_base:

    """
    PURPOSE:
        This is the Base/Parent class for programs that use the Snowflake
        Connector for Python.
        This class is intended primarily for:
            * Sample programs, e.g. in the documentation.
            * Tests.
    """


    def __init__(self, p_log_file_name = None):

        """
        PURPOSE:
            This does any required initialization steps, which in this class is
            basically just turning on logging.
        """

        file_name = p_log_file_name
        if file_name is None:
            file_name = '/tmp/snowflake_python_connector.log'

        # -- (> ---------- SECTION=begin_logging -----------------------------
        logging.basicConfig(
            filename=file_name,
            level=logging.INFO)
        # -- <) ---------- END_SECTION ---------------------------------------


    # -- (> ---------------------------- SECTION=main ------------------------
    def main(self, argv):

        """
        PURPOSE:
            Most tests follow the same basic pattern in this main() method:
               * Create a connection.
               * Set up, e.g. use (or create and use) the warehouse, database,
                 and schema.
               * Run the queries (or do the other tasks, e.g. load data).
               * Clean up. In this test/demo, we drop the warehouse, database,
                 and schema. In a customer scenario, you'd typically clean up
                 temporary tables, etc., but wouldn't drop your database.
               * Close the connection.
        """

        # Read the connection parameters (e.g. user ID) from the command line
        # and environment variables, then connect to Snowflake.
        connection = self.create_connection(argv)

        # Set up anything we need (e.g. a separate schema for the test/demo).
        self.set_up(connection)

        # Do the "real work", for example, create a table, insert rows, SELECT
        # from the table, etc.
        self.do_the_real_work(connection)

        # Clean up. In this case, we drop the temporary warehouse, database, and
        # schema.
        self.clean_up(connection)

        print("\nClosing connection...")
        # -- (> ------------------- SECTION=close_connection -----------------
        connection.close()
        # -- <) ---------------------------- END_SECTION ---------------------

    # -- <) ---------------------------- END_SECTION=main --------------------


    def args_to_properties(self, args):

        """
        PURPOSE:
            Read the command-line arguments and store them in a dictionary.
            Command-line arguments should come in pairs, e.g.:
                "--user MyUser"
        INPUTS:
            The command line arguments (sys.argv).
        RETURNS:
            Returns the dictionary.
        DESIRABLE ENHANCEMENTS:
            Improve error detection and handling.
        """

        connection_parameters = {}

        i = 1
        while i < len(args) - 1:
            property_name = args[i]
            # Strip off the leading "--" from the tag, e.g. from "--user".
            property_name = property_name[2:]
            property_value = args[i + 1]
            connection_parameters[property_name] = property_value
            i += 2

        return connection_parameters


    def create_connection(self, argv):

        """
        PURPOSE:
            This gets account identifier and login information from the
            environment variables and command-line parameters, connects to the
            server, and returns the connection object.
        INPUTS:
            argv: This is usually sys.argv, which contains the command-line
                  parameters. It could be an equivalent substitute if you get
                  the parameter information from another source.
        RETURNS:
            A connection.
        """

        # Get account identifier and login information from environment variables and command-line parameters.
        # For information about account identifiers, see
        # https://docs.snowflake.com/en/user-guide/admin-account-identifier.html .
        # -- (> ----------------------- SECTION=set_login_info ---------------

        # Get the password from an appropriate environment variable, if
        # available.
        PASSWORD = os.getenv('SNOWSQL_PWD')

        # Get the other login info etc. from the command line.
        if len(argv) < 11:
            msg = "ERROR: Please pass the following command-line parameters:\n"
            msg += "--warehouse <warehouse> --database <db> --schema <schema> "
            msg += "--user <user> --account <account_identifier> "
            print(msg)
            sys.exit(-1)
        else:
            connection_parameters = self.args_to_properties(argv)
            USER = connection_parameters["user"]
            ACCOUNT = connection_parameters["account"]
            WAREHOUSE = connection_parameters["warehouse"]
            DATABASE = connection_parameters["database"]
            SCHEMA = connection_parameters["schema"]
            # Optional: for internal testing only.
            try:
                PORT = connection_parameters["port"]
            except:
                PORT = ""
            try:
                PROTOCOL = connection_parameters["protocol"]
            except:
                PROTOCOL = ""

        # If the password is set by both command line and env var, the
        # command-line value takes precedence over (is written over) the
        # env var value.

        # If the password wasn't set either in the environment var or on
        # the command line...
        if PASSWORD is None or PASSWORD == '':
            print("ERROR: Set password, e.g. with SNOWSQL_PWD environment variable")
            sys.exit(-2)
        # -- <) ---------------------------- END_SECTION ---------------------

        # Optional diagnostic:
        #print("USER:", USER)
        #print("ACCOUNT:", ACCOUNT)
        #print("WAREHOUSE:", WAREHOUSE)
        #print("DATABASE:", DATABASE)
        #print("SCHEMA:", SCHEMA)
        #print("PASSWORD:", PASSWORD)
        #print("PROTOCOL:" "'" + PROTOCOL + "'")
        #print("PORT:" + "'" + PORT + "'")

        print("Connecting...")
        # If the PORT is set but the protocol is not, we ignore the PORT (bug!!).
        if PROTOCOL is None or PROTOCOL == "" or PORT is None or PORT == "":
            # -- (> ------------------- SECTION=connect_to_snowflake ---------
            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA
                )
            # -- <) ---------------------------- END_SECTION -----------------
        else:

            conn = snowflake.connector.connect(
                user=USER,
                password=PASSWORD,
                account=ACCOUNT,
                warehouse=WAREHOUSE,
                database=DATABASE,
                schema=SCHEMA,
                # Optional: for internal testing only.
                protocol=PROTOCOL,
                port=PORT
                )

        return conn


    def set_up(self, connection):

        """
        PURPOSE:
            Set up to run a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.create_warehouse_database_and_schema(connection)


    def do_the_real_work(self, conn):

        """
        PURPOSE:
            Your sub-class should override this to include the code required for
            your documentation sample or your test case.
            This default method does a very simple self-test that shows that the
            connection was successful.
        """

        # Create a cursor for this connection.
        cursor1 = conn.cursor()
        # This is an example of an SQL statement we might want to run.
        command = "SELECT PI()"
        # Run the statement.
        cursor1.execute(command)
        # Get the results (should be only one):
        for row in cursor1:
            print(row[0])
        # Close this cursor.
        cursor1.close()


    def clean_up(self, connection):

        """
        PURPOSE:
            Clean up after a test. You can override this method with one
            appropriate to your test/demo.
        """

        # Create a temporary warehouse, database, and schema.
        self.drop_warehouse_database_and_schema(connection)


    def create_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Create the temporary schema, database, and warehouse that we use
            for most tests/demos.
        """

        # Create a database, schema, and warehouse if they don't already exist.
        print("\nCreating warehouse, database, schema...")
        # -- (> ------------- SECTION=create_warehouse_database_schema -------
        conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse_mg")
        conn.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------

        # -- (> --------------- SECTION=use_warehouse_database_schema --------
        conn.cursor().execute("USE WAREHOUSE tiny_warehouse_mg")
        conn.cursor().execute("USE DATABASE testdb_mg")
        conn.cursor().execute("USE SCHEMA testdb_mg.testschema_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


    def drop_warehouse_database_and_schema(self, conn):

        """
        PURPOSE:
            Drop the temporary schema, database, and warehouse that we create
            for most tests/demos.
        """

        # -- (> ------------- SECTION=drop_warehouse_database_schema ---------
        conn.cursor().execute("DROP SCHEMA IF EXISTS testschema_mg")
        conn.cursor().execute("DROP DATABASE IF EXISTS testdb_mg")
        conn.cursor().execute("DROP WAREHOUSE IF EXISTS tiny_warehouse_mg")
        # -- <) ---------------------------- END_SECTION ---------------------


# ----------------------------------------------------------------------------

if __name__ == '__main__':
    pvb = python_veritas_base()
    pvb.main(sys.argv)


Copy

A segunda parte do código de exemplo cria uma tabela, insere linhas nela, etc:


import sys

# -- (> ---------------------- SECTION=import_connector ---------------------
import snowflake.connector
# -- <) ---------------------------- END_SECTION ----------------------------


# Import the base class that contains methods used in many tests and code 
# examples.
from python_veritas_base import python_veritas_base


class python_connector_example (python_veritas_base):

  """
  PURPOSE:
      This is a simple example program that shows how to use the Snowflake 
      Python Connector to create and query a table.
  """

  def __init__(self):
    pass


  def do_the_real_work(self, conn):

    """
    INPUTS:
        conn is a Connection object returned from snowflake.connector.connect().
    """

    print("\nCreating table test_table...")
    # -- (> ----------------------- SECTION=create_table ---------------------
    conn.cursor().execute(
        "CREATE OR REPLACE TABLE "
        "test_table(col1 integer, col2 string)")

    conn.cursor().execute(
        "INSERT INTO test_table(col1, col2) VALUES " + 
        "    (123, 'test string1'), " + 
        "    (456, 'test string2')")
    # -- <) ---------------------------- END_SECTION -------------------------


    print("\nSelecting from test_table...")
    # -- (> ----------------------- SECTION=querying_data --------------------
    cur = conn.cursor()
    try:
        cur.execute("SELECT col1, col2 FROM test_table ORDER BY col1")
        for (col1, col2) in cur:
            print('{0}, {1}'.format(col1, col2))
    finally:
        cur.close()
    # -- <) ---------------------------- END_SECTION -------------------------




# ============================================================================

if __name__ == '__main__':

    test_case = python_connector_example()
    test_case.main(sys.argv)

Copy

Para executar esse exemplo, faça o seguinte:

  1. Copie o primeiro trecho de código para um arquivo chamado “python_veritas_base.py”.

  2. Copie o segundo trecho de código para um arquivo chamado “python_connector_example.py”.

  3. Defina a variável de ambiente SNOWSQL_PWD com sua senha; por exemplo:

    export SNOWSQL_PWD='MyPassword'
    
    Copy
  4. Execute o programa usando uma linha de comando semelhante à seguinte (substitua as informações de usuário e conta por suas próprias informações de usuário e conta, é claro).

    Aviso

    Isso exclui o warehouse, o banco de dados e o esquema no final do programa! Não use o nome de um banco de dados existente porque você vai perdê-lo!

    python3 python_connector_example.py --warehouse <unique_warehouse_name> --database <new_warehouse_zzz_test> --schema <new_schema_zzz_test> --account myorganization-myaccount --user MyUserName
    
    Copy

Aqui está a saída:

Connecting...

Creating warehouse, database, schema...

Creating table test_table...

Selecting from test_table...
123, test string1
456, test string2

Closing connection...
Copy

Aqui está um exemplo mais longo:

Nota

Na seção onde você define suas informações de conta e login, certifique-se de substituir as variáveis conforme necessário para corresponder às suas informações de login do Snowflake (nome, senha, etc.).

Este exemplo usa a função format() para compor a instrução. Se seu ambiente tem um risco de ataques de injeção SQL, você pode preferir vincular valores em vez de usar format().

#!/usr/bin/env python
#
# Snowflake Connector for Python Sample Program
#

# Logging
import logging
logging.basicConfig(
    filename='/tmp/snowflake_python_connector.log',
    level=logging.INFO)

import snowflake.connector

# Set your account and login information (replace the variables with
# the necessary values).
ACCOUNT = '<account_identifier>'
USER = '<login_name>'
PASSWORD = '<password>'

import os

# Only required if you copy data from your S3 bucket
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Connecting to Snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
)

# Creating a database, schema, and warehouse if none exists
con.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS tiny_warehouse")
con.cursor().execute("CREATE DATABASE IF NOT EXISTS testdb")
con.cursor().execute("USE DATABASE testdb")
con.cursor().execute("CREATE SCHEMA IF NOT EXISTS testschema")

# Using the database, schema and warehouse
con.cursor().execute("USE WAREHOUSE tiny_warehouse")
con.cursor().execute("USE SCHEMA testdb.testschema")

# Creating a table and inserting data
con.cursor().execute(
    "CREATE OR REPLACE TABLE "
    "testtable(col1 integer, col2 string)")
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(123, 'test string1'),(456, 'test string2')")

# Copying data from internal stage (for testtable table)
con.cursor().execute("PUT file:///tmp/data0/file* @%testtable")
con.cursor().execute("COPY INTO testtable")

# Copying data from external stage (S3 bucket -
# replace <s3_bucket> with the name of your bucket)
con.cursor().execute("""
COPY INTO testtable FROM s3://<s3_bucket>/data/
     STORAGE_INTEGRATION = myint
     FILE_FORMAT=(field_delimiter=',')
""".format(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY))

# Querying data
cur = con.cursor()
try:
    cur.execute("SELECT col1, col2 FROM testtable")
    for (col1, col2) in cur:
        print('{0}, {1}'.format(col1, col2))
finally:
    cur.close()

# Binding data
con.cursor().execute(
    "INSERT INTO testtable(col1, col2) "
    "VALUES(%(col1)s, %(col2)s)", {
        'col1': 789,
        'col2': 'test string3',
        })

# Retrieving column names
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(','.join([col[0] for col in cur.description]))

# Catching syntax errors
cur = con.cursor()
try:
    cur.execute("SELECT * FROM testtable")
except snowflake.connector.errors.ProgrammingError as e:
    # default error message
    print(e)
    # user error message
    print('Error {0} ({1}): {2} ({3})'.format(e.errno, e.sqlstate, e.msg, e.sfqid))
finally:
    cur.close()

# Retrieving the Snowflake query ID
cur = con.cursor()
cur.execute("SELECT * FROM testtable")
print(cur.sfqid)

# Closing the connection
con.close()
Copy