Snowpark Migration Accelerator: conversão de pipelines

O SMA “converteu” nossos scripts, mas será mesmo? O que ele fez, na verdade, foi converter todas as referências à API do Spark para a API do Snowpark, mas ele não substituiu as conexões que possam existir em seus pipelines.

O poder do SMA reside nos relatórios de avaliação que ele gera, já que a conversão está atrelada à conversão de referências à API do Spark para a API do Snowpark. Observe que a conversão dessas referências não será suficiente para executar nenhum pipeline de dados. Você precisará garantir que as conexões do pipeline sejam resolvidas manualmente. O SMA não pode presumir conhecer os parâmetros de conexão ou outros elementos que provavelmente não estão disponíveis para serem executados por meio dele.

Como em qualquer conversão, é possível lidar com o código convertido de várias maneiras. As etapas a seguir são como recomendamos que você aborde a saída da ferramenta de conversão. Assim como o SnowConvert, o SMA requer atenção à saída. Nenhuma conversão será 100% automatizada. Isso é particularmente verdadeiro para o SMA. Como o SMA converte referências à API do Spark para a API do Snowpark, você sempre precisará verificar como essas referências estão sendo executadas. Ele não tenta orquestrar a execução bem-sucedida de nenhum script ou notebook executado por meio dele.

Portanto, seguiremos estas etapas para analisar a saída do SMA, que será ligeiramente diferente da saída do SnowConvert:

  • Resolver todos os problemas: «problemas» aqui significa os problemas gerados pelo SMA. Analise o código de saída. Resolva os erros de análise e conversão e investigue os avisos.

  • Resolver as chamadas de sessão: a forma como a chamada de sessão é escrita no código de saída depende de onde vamos executar o arquivo. Vamos resolver isso para executar os arquivos de código na mesma localização em que seriam executados originalmente e, em seguida, para executá-los no Snowflake.

  • Resolver as entradas/saídas: não é possível resolver as conexões com diferentes fontes completamente pelo SMA. Existem diferenças entre as plataformas, e o SMA geralmente ignora isso. Isso também é afetado por onde o arquivo será executado.

  • Limpar e testar: vamos executar o código. Veja se funciona. Neste laboratório, faremos testes básicos, mas existem ferramentas para realizar testes e validações de dados mais extensivos, incluindo o Snowpark Python Checkpoints.

Então, vamos dar uma olhada em como isso funciona. Faremos isso com duas abordagens: a primeira é executar isso em Python na máquina local (enquanto o script de origem está sendo executado). A segunda seria fazer tudo no Snowflake… no Snowsight, mas para um pipeline de dados que lê de uma fonte local, isso não será 100% possível no Snowsight. Mas tudo bem. Não estamos convertendo a orquestração deste script neste POC.

Vamos começar com o arquivo de script do pipeline e, na próxima seção, abordaremos o notebook.

Resolver problemas

Vamos abrir nosso código-fonte e nosso código de saída em um editor de código. Você pode usar qualquer editor de código de sua preferência, mas, como já foi mencionado diversas vezes, a Snowflake recomenda o uso do VS Code com a extensão Snowflake. A extensão Snowflake não só ajuda a navegar pelos problemas do SnowConvert, como também pode executar Snowpark Checkpoints para Python, o que auxilia nos testes e na análise da causa raiz (embora esteja um pouco fora do escopo deste laboratório).

Vamos abrir o diretório que criamos originalmente na tela de criação do projeto (Spark ADW Lab) no código VS:

diretório do laboratório

Observe que a estrutura do diretório de saída será a mesma do diretório de entrada. Até mesmo o arquivo de dados será copiado, apesar de nenhuma conversão ocorrer. Também haverá alguns arquivos checkpoints.json criados pelo SMA. Esses são arquivos JSON que contêm instruções para a extensão Snowpark Checkpoints. A extensão Snowflake pode carregar pontos de verificação tanto no código-fonte quanto no código de saída com base nos dados desses arquivos. Vamos ignorá-los por enquanto.

Por fim, vamos comparar o script Python de entrada com o script convertido na saída.

Comparação de scripts

Esta é uma comparação lado a lado bem básica, com o código Spark original à esquerda e o código compatível com Snowpark à direita. Parece que algumas importações foram convertidas, assim como as chamadas de sessão. Podemos ver um EWI na parte inferior da imagem acima, mas não vamos começar por aí. Precisamos encontrar o erro de análise antes de fazer qualquer outra coisa.

Podemos pesquisar no documento o código de erro para esse erro de análise que foi mostrado tanto no UI quanto no issues.csv: SPRKPY1101.

Código de erro

Como não filtrei os resultados, a listagem deste código de erro no arquivo issues.csv também aparece na pesquisa e no arquivo AssessmentReport.json utilizado para gerar o relatório de avaliação resumido AssessmentReport.docx. Este é o relatório principal que os usuários consultarão para entender uma grande carga de trabalho, mas não o analisamos neste laboratório. (Mais informações sobre este relatório podem ser encontradas na documentação do SMA.) Vamos escolher onde este EWI aparece no arquivo pipeline_dimcustomer.py, como mostrado acima.

Você pode ver que esta linha de código estava presente na parte inferior do código-fonte.

# Conversion Input.
some rogue code that doesn't make any sense!

# Conversion Output.
some
# EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(131, 6). Last valid token was 'some' @(131, 1), failed token 'rogue' @(131, 6)
#     rogue code that doesn't make any sense!
Copy

Parece que esse erro de análise foi causado por… “algum código inválido que não faz sentido!”. Esta linha de código está no fim do arquivo do pipeline. Não é incomum haver caracteres extras ou outros elementos em um arquivo de código como parte de uma extração de uma fonte. Observe que o SMA detectou que este não era um código Python válido e gerou o erro de análise.

Você também pode ver como o SMA insere o código de erro e a descrição no código de saída como um comentário onde o erro ocorreu. É assim que todas as mensagens de erro aparecerão na saída.

Como este não é um código válido, ele está no final do arquivo e nada mais foi removido como resultado deste erro; o código original e o comentário podem ser removidos com segurança do arquivo de código de saída.

E agora resolvemos nosso primeiro e mais sério problema. Anime-se.

Vamos analisar o restante dos nossos EWIs neste arquivo. Podemos procurar por “EWI” porque agora sabemos que esse texto aparecerá no comentário sempre que houver um código de erro. Como alternativa, poderíamos classificar o arquivo issues.csv e ordenar os problemas por gravidade, mas isso não é realmente necessário aqui.

O próximo é, na verdade, apenas um aviso, não um erro. Ele nos informa que foi utilizada uma função que nem sempre é equivalente no Spark e no Snowpark:

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
split_col = split(df_uppercase['NAME'], '.first:')
Copy

A descrição aqui, no entanto, indica que provavelmente não precisamos nos preocupar com isso. Há apenas dois parâmetros sendo passados. Vamos deixar este EWI como um comentário no arquivo, para que saibamos que devemos verificá-lo quando executarmos o arquivo posteriormente.

O último para este arquivo é um erro de conversão dizendo que algo não é compatível:

Erro não compatível

Esta é a chamada de gravação para o driver JDBC do Spark para gravar o dataframe de saída no SQL Server. Como isso faz parte da etapa de “resolver todas as entradas/saídas” que abordaremos após solucionarmos nossos problemas, vamos deixar isso para depois. Observe, no entanto, que esse erro deve ser resolvido. O anterior era apenas um aviso e pode funcionar mesmo sem nenhuma alteração.

Resolução das chamadas de sessão

As chamadas de sessão são convertidas pelo SMA, mas você deve prestar atenção especial a elas para garantir que estejam funcionando corretamente. Em nosso script de pipeline, este é o código antes e depois:

Script antes e depois

A referência do SparkSession foi alterada para Session. Você também pode ver essa alteração de referência perto do início deste arquivo na instrução de importação:

Alteração de referência

Observe na imagem acima que a atribuição da variável da chamada de sessão para “spark” não foi alterada. Isso ocorre porque se trata de uma atribuição de variável. Não é necessário alterá-la, mas se você quiser alterar o decorador “spark” para “session”, isso estará mais de acordo com o que o Snowpark recomenda. Observe que a extensão “Assistente do SMA” do VS Code também sugerirá essas alterações.

Este é um exercício simples, mas vale a pena fazer. Você pode usar a função de busca do VS Code para localizar e substituir as referências a “spark” neste arquivo e substituí-las por “session”. Você pode ver o resultado disso na imagem abaixo. As referências à variável “spark” no código convertido foram substituídas por “session”:

Variáveis do Spark convertidas para session

Também podemos remover outro objeto desta chamada de sessão. Como não vamos mais executar “spark”, não precisamos especificar o caminho para o driver spark. Portanto, podemos remover completamente a função de configuração da chamada de sessão assim:

# Old Converted output.
# Spark Session
session = Session.builder.config('spark.driver.extraClassPath', driver_path) \
                    .app_name('SparkSQLServerExample', True) \
                    .getOrCreate()

# New Converted Output
# Snowpark Session
session = Session.builder.app_name('SparkSQLServerExample', True).getOrCreate()
Copy

Podemos convertê-la em uma única linha. O SMA não tinha certeza de que não precisávamos desse driver (embora isso pareça lógico), então não o removeu. Mas agora temos nossa chamada de sessão completa.

Observe que o SMA também adiciona uma “tag de consulta” à sessão. Isso serve para ajudar na solução de problemas com essa sessão ou consulta posteriormente, mas é completamente opcional manter ou remover essa tag.

Notas sobre as chamadas de sessão

Acredite ou não, isso é tudo o que precisamos alterar no código para a chamada de sessão, mas não é tudo o que precisamos fazer para criar a sessão. Isso se refere à pergunta original de que muito disso depende de onde você deseja executar esses arquivos. Essas chamadas de sessão Spark originais usavam uma configuração que foi definida em outro lugar. Se você observar a chamada de sessão Spark original, verá que ela procura um arquivo de configuração que está sendo lido em um dataframe do pandas no início deste arquivo de script (isso também se aplica ao nosso arquivo de notebook).

Referência do arquivo de configuração

O Snowpark pode funcionar da mesma maneira, e esta conversão pressupõe que é assim que esse usuário executará esse código. No entanto, para que a chamada de sessão existente funcione, o usuário precisaria carregar todas as informações de sua conta Snowflake no arquivo connections.toml local (ou pelo menos acessível) nesta máquina, e a conta à qual ele está tentando se conectar precisa estar definida como padrão. Você pode aprender mais sobre como atualizar o arquivo connections.toml na documentação do Snowflake/Snowpark, mas a ideia por trás disso é que exista um local acessível que contenha as credenciais. Quando uma sessão do Snowpark é criada, ela verifica isso… a menos que os parâmetros de conexão sejam passados explicitamente para a chamada de sessão.

A maneira padrão de fazer isso é inserir os parâmetros de conexão diretamente como cadeias de caracteres e chamá-los com a sessão:

# Parameters in a dictionary.
connection_parameters = {
  "account": "<your snowflake account>",
  "user": "<your snowflake user>",
  "password": "<your snowflake password>",
  "role": "<your snowflake role>",  # optional
  "warehouse": "<your snowflake warehouse>",  # optional
  "database": "<your snowflake database>",  # optional
  "schema": "<your snowflake schema>",  # optional
}

# The session call
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()
Copy

O AdventureWorks parece ter referenciado um arquivo com essas credenciais e o chamado. Supondo que exista um arquivo semelhante chamado “snowflake_credentials.txt” acessível, a sintaxe correspondente seria algo como:

# Load into a dataframe.
snow_creds = pd.read_csv('snowflake_credentials.txt', index_col=None, header=0)

# Build the parameters.
connection_parameters = {
  "account": snow_creds.loc[snow_creds['Specific_Element'] == 'Account', 'Value'].item(),
  "user": snow_creds.loc[snow_creds['Specific_Element'] == 'Username', 'Value'].item(),
  "password": snow_creds.loc[snow_creds['Specific_Element'] == 'Password', 'Value'].item(),
  "role": "<your snowflake role>",  # optional
  "warehouse": snow_creds.loc[snow_creds['Specific_Element'] == 'Warehouse', 'Value'].item(),  # optional
  "database": snow_creds.loc[snow_creds['Specific_Element'] == 'Database', 'Value'].item(),  # optional
  "schema": snow_creds.loc[snow_creds['Specific_Element'] == 'Schema', 'Value'].item(),  # optional
}

# Then pass the parameters to the configs function of the session builder.
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()
Copy

Para fins de limite de tempo neste laboratório, a primeira opção pode fazer mais sentido. Há mais informações sobre isso na documentação do Snowpark.

Observe que, para que nosso arquivo de notebook seja executado dentro do Snowflake usando o Snowsight, você não precisaria fazer nada disso. Você simplesmente chamaria a sessão ativa e a executaria.

Agora é hora do componente mais crítico desta migração: resolver quaisquer referências de entrada/saída.

Resolução de entradas e saídas

Vamos resolver nossas entradas e saídas agora. Observe que isso pode variar dependendo se você está executando os arquivos localmente ou no Snowflake. Para o script Python, vamos verificar o que ganhamos/perdemos ao executá-lo diretamente no Snowsight: você não pode executar toda a operação no Snowsight (pelo menos não no momento). O arquivo .csv local não é acessível pelo Snowsight. Você terá que carregar o arquivo .csv em uma área de preparação manualmente. Essa provavelmente não é a solução ideal, mas podemos testar a conversão dessa forma.

Primeiro vamos preparar este arquivo para ser executado/orquestrado localmente e, em seguida, para ser executado no Snowflake.

Para resolver as entradas e saídas do script do pipeline, precisamos primeiro identificá-las. Elas são bem simples. Este script parece:

  • acessar um arquivo local

  • carregar o resultado no SQL Server (mas agora, Snowflake)

  • mover o arquivo para liberar espaço para o próximo

Simples o suficiente. Portanto, precisamos substituir cada componente do código que realiza essas ações. Vamos começar acessando o arquivo local.

Como mencionado no início, seria recomendável reestruturar o sistema de ponto de venda e as ferramentas de orquestração utilizadas para executar este script Python, de modo a colocar o arquivo de saída em um local de armazenamento em nuvem. Em seguida, você poderia transformar esse local em uma tabela externa e pronto… você estaria no Snowflake. No entanto, a arquitetura atual indica que este arquivo não está em um local de armazenamento em nuvem e permanecerá onde está; portanto, precisamos criar uma maneira para o Snowflake acessar o arquivo preservando a lógica existente.

Temos opções para fazer isso, mas criaremos uma área de preparação interna e moveremos o arquivo para a área de preparação com o script. Em seguida, precisaríamos mover o arquivo no sistema de arquivos local e também movê-lo para a área de preparação. Tudo isso pode ser feito com o Snowpark. Vamos analisar passo a passo:

  • acesso a um arquivo local: Crie uma área de preparação interna (se ainda não existir) -> Carregue o arquivo na área de preparação -> Leia o arquivo em um dataframe

  • carregamento do resultado no SQL Server: Carregar os dados transformados em uma tabela no Snowflake

  • move o arquivo para liberar espaço para o próximo: Mova o arquivo local -> Mova o arquivo na área de preparação.

Vamos analisar o código que pode fazer cada uma dessas ações.

Acessar um arquivo acessível localmente

Esse código-fonte no Spark é semelhante a isto:

# Spark read from a local csv file.
df = spark.read.csv('customer_update.csv', header=True, inferSchema=True)
Copy

E o código do Snowflake transformado (pelo SMA) se parece com isto:

# Snowpark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv('customer_update.csv')
Copy

Podemos substituir isso por este código que executa as etapas acima:

  1. Crie uma área de preparação interna (se ainda não existir). Vamos criar uma área de preparação chamada “LOCAL_LOAD_STAGE” e executar algumas etapas para garantir que a área de preparação esteja correta.

# Create a stage if one does not already exist.
# name the stage we're going to use.
target_stage_name = "LOCAL_LOAD_STAGE"

# Check to see if this stage already exists.
stages = session.sql("SHOW STAGES").collect()
target_stages = [stage for stage in stages if stage['name'] == target_stage_name]

# Create the stage if it does not already exist.
if(len(target_stages) < 1):
    from snowflake.core import Root
    from snowflake.core.stage import Stage, StageEncryption, StageResource
    root = Root(session)
    my_stage = Stage(name="LOCAL_LOAD_STAGE",encryption=StageEncryption(type="SNOWFLAKE_SSE"))
    root.databases["ADVENTUREWORKS"].schemas["DBO"].stages.create(my_stage)
    print('%s created.'%(target_stage_name))
else:
    print('%s already exists.'%(target_stage_name))

Copy
  1. Carregue o arquivo na área de preparação.

# Move the file.
put_results = session.file.put(local_file_name="customer_update.csv",
                    stage_location="ADVENTUREWORKS.DBO.LOCAL_LOAD_STAGE",
                    overwrite=False,
                    auto_compress=False)

# Read the results.
for r in put_results:
    str_output = ("File {src}: {stat}").format(src=r.source,stat=r.status)
    print(str_output)    
Copy
  1. Leia o arquivo em um dataframe. Esta é a parte que o SMA realmente converteu. Precisamos especificar que a localização do arquivo agora é a área de preparação interna.

# Location of the file in the stage.
csv_file_path = "@LOCAL_LOAD_STAGE/customer_update.csv"

# Spark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv(csv_file_path)
Copy

O resultado disso seria algo como:

Código reescrito

Vamos ao próximo passo.

Carregamento do resultado no Snowflake

O script original gravou o dataframe no SQL Server. Agora vamos carregá-lo no Snowflake. Essa é uma conversão muito mais simples. O dataframe já é um dataframe do Snowpark. Essa é uma das vantagens do Snowflake. Agora que os dados estão acessíveis ao Snowflake, tudo acontece dentro dele.

# Original output from the conversion tool.
# Write the DataFrame to SQL Server.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameWriter.jdbc is not supported
df_transformed.write.jdbc(url=sql_server_url,
              table='dbo.DimCustomer',
              mode="append",
              properties={
                  "user": sql_server_user,
                  "password": sql_server_password,
                  "driver": driver_path
              })

# Corrected Snowflake/Snowpark code.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER", mode="append")
Copy

Observe que podemos querer gravar em uma tabela temporária para fazer alguns testes/validações, mas este é o comportamento no script original.

Mover o arquivo para liberar espaço para o próximo

Este é o comportamento no script original. Não precisamos realmente fazer isso no Snowflake, mas podemos demonstrar a mesma funcionalidade na área de preparação. Isso é feito com um comando os no sistema de arquivos original. Isso não depende do Spark e permanecerá o mesmo. Mas para emular esse comportamento no Snowpark, precisaríamos mover esse arquivo da área de preparação para um novo diretório.

Isso pode ser feito de forma simples com o seguinte código Python:

# New filename.
original_filepath = '@LOCAL_LOAD_STAGE/customer_update.csv'
new_filepath = '@LOCAL_LOAD_STAGE/old_versions/customer_update_%s.csv'%(today_time)

copy_sql = f"COPY FILES INTO {new_filepath} FROM {original_filepath}"
session.sql(copy_sql).collect()
print(f"File copied from {original_filepath} to {new_filepath}")

remove_sql = f"REMOVE {original_filepath}"
session.sql(remove_sql).collect()
print(f"Original file {original_filepath} removed.")

Copy

Observe que isso não substituirá nenhum código existente. Como já queremos manter a movimentação existente do código Spark para o Snowpark, manteremos a referência ao os. A versão final ficará assim:

Código final

Agora temos a mesma movimentação completamente concluída. Agora vamos fazer nossa limpeza final e testar esse script.

Limpar e testar

Nunca analisamos nossas chamadas de importação e temos arquivos de configuração que não são necessários. Poderíamos manter as referências aos arquivos de configuração e executar o script. Na verdade, supondo que esses arquivos de configuração ainda estejam acessíveis, o código ainda será executado. Mas, se analisarmos atentamente nossas instruções de importação, podemos removê-las. Esses arquivos são representados por todo o código entre as instruções de importação e a chamada da sessão:

Instruções removidas

Há algumas outras ações que devemos realizar:

  • Verifique se todas as nossas importações ainda são necessárias. Podemos deixá-las por enquanto. Se houver um erro, podemos corrigi-lo.

  • Também temos um EWI que deixamos lá como um aviso para verificar. Portanto, precisamos garantir que vamos inspecionar essa saída.

  • Precisamos ter certeza de que o comportamento do nosso sistema de arquivos é semelhante ao do sistema de arquivos esperado para o sistema POS. Para isso, devemos mover o arquivo customer_update.csv para a pasta raiz escolhida na primeira execução do VS Code.

  • Crie um diretório chamado “old_versions” nesse mesmo diretório. Isso deve permitir a execução das operações do sistema operacional.

Por fim, se você não se sentir confortável em executar o código diretamente na tabela de produção, crie uma cópia da tabela para esse teste e direcione o carregamento para essa cópia. Substitua a instrução de carregamento pela seguinte. Como se trata de um laboratório, fique à vontade para gravar na tabela “production”:

# In case we want to test.
create_sql = """
                CREATE OR REPLACE TABLE ADVENTUREWORKS.DBO.DIMCUSTOMER_1
                AS select * from ADVENTUREWORKS.DBO.DIMCUSTOMER;
                """
session.sql(create_sql).collect()

# Write the DataFrame to SQL Server.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER_1", mode="append")
Copy

Agora estamos finalmente prontos para testar. Podemos executar este script em Python em uma tabela de teste e ver se ele falha. Então execute!

Trágico! O script falhou com o seguinte erro:

Erro de falha do script

Parece que a forma como estamos referenciando um identificador não é a que a Snowpark esperava. O código que falhou está exatamente na mesma localização em que o EWI restante está:

Linha de código responsável pelo erro

Você pode consultar a documentação no link fornecido pelo erro, mas, para economizar tempo, o Snowpark precisa que essa variável seja explicitamente um literal. Precisamos fazer a seguinte substituição:

# Old
split_col = split(df_uppercase['NAME'], '.first:')

# New
split_col = split(df_uppercase['NAME'], lit('.first:'))
Copy

Isso deve resolver esse erro. Observe que sempre haverá algumas diferenças funcionais entre as plataformas de origem e destino. Ferramentas de conversão como o SMA tentam tornar essas diferenças o mais óbvias possível. Mas lembre-se de que nenhuma conversão é 100% automatizada.

Vamos executar novamente. Desta vez… sucesso!

Mensagem de sucesso

Podemos escrever algumas consultas em Python para validar isso, mas por que não acessamos o Snowflake (já que é o que faremos de qualquer forma)?

Navegue até a sua conta Snowflake que você usou para executar esses scripts. Deve ser a mesma que você usou para carregar o banco de dados do SQL Server (e se você não fez isso, os scripts acima não funcionarão, pois os dados ainda não foram migrados).

É possível verificar isso rapidamente conferindo se a área de preparação foi criada com o arquivo:

Área de preparação criada localizada

Habilite a exibição da tabela de diretórios para verificar se a pasta old_versions está lá:

Botão Enable Directory Table

E é:

Pasta old_versions localizada

Como esse foi o último elemento do nosso script, parece que está tudo certo!

Também podemos simplesmente validar se os dados foram carregados consultando a tabela para obter os dados que enviamos. Você pode abrir uma nova planilha e simplesmente escrever esta consulta:

select * from ADVENTUREWORKS.DBO.DIMCUSTOMER
where FIRSTNAME like '%Brandon%'
AND LASTNAME like '%Carver%'
Copy

Este é um dos nomes que acabou de ser carregado. E parece que nosso pipeline funcionou:

Consulta bem-sucedida

Execução do script do pipeline no Snowsight

Vamos dar uma olhada rápida no fluxo que estamos tentando converter, que era executado no Spark:

  • acesso a um arquivo local

  • carregamento do resultado no SQL Server

  • movimentação do arquivo para abrir caminho para o próximo

Não é possível executar este fluxo inteiramente de dentro do Snowsight. O Snowsight não tem acesso a um sistema de arquivos local. A recomendação aqui seria mover a exportação do POS para um data lake… ou qualquer outra opção que seja acessível via Snowsight.

Podemos, no entanto, analisar mais detalhadamente como o Snowpark lida com a lógica de transformação executando o script Python no Snowflake. Se você já fez as alterações recomendadas acima, pode executar o corpo do script em uma planilha Python no Snowflake.

Para fazer isso, primeiro faça login na sua conta Snowflake e navegue até a seção de planilhas. Nesta planilha, crie uma nova planilha Python:

Itens do menu de planilhas

Especifique o banco de dados, o esquema, a função e o warehouse que você gostaria de usar:

Menu para banco de dados e esquema

Agora não precisamos lidar com nossa chamada de sessão. Você verá um modelo gerado na janela da planilha:

Modelo Python gerado

Vamos começar trazendo nossas chamadas de importação. Depois de preparar o script anterior para uso, devemos ter o seguinte conjunto de importações:

# General Imports
import pandas as pd
import os
import shutil
import datetime

# Snowpark Imports
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import upper
from snowflake.snowpark.functions import lower
from snowflake.snowpark.functions import split
from snowflake.snowpark.functions import trim
from snowflake.snowpark.functions import when
from snowflake.snowpark.functions import lit
from snowflake.snowpark.functions import expr
from snowflake.snowpark.functions import regexp_replace
Copy

Precisamos apenas das importações do Snowpark. Não vamos mover arquivos no sistema de arquivos. Poderemos manter a referência de data e hora se quisermos mover o arquivo na área de preparação. Vamos fazer isso.

Cole as importações do Snowpark (mais a data e hora) na planilha do Python, abaixo das outras importações que já estão presentes. Observe que «col» já está importado, então você pode remover uma delas:

Novo código com importações coladas

Abaixo da chamada «def main», vamos colar todo o nosso código de transformação. Isso incluirá tudo, desde a atribuição da localização do CSV até a gravação do dataframe em uma tabela.

Daqui:

Código copiado

Até aqui:

Código colado

Também podemos adicionar novamente o código que move os arquivos dentro da área de preparação. Esta parte:

Código adicionado

Antes de executar o código, você precisará criar a área de preparação manualmente e mover o arquivo para dentro dela. Podemos adicionar a instrução de criação de área de preparação ao script, mas ainda precisaríamos carregar manualmente o arquivo na área de preparação.

Portanto, se você abrir outra planilha (desta vez… uma planilha SQL), poderá executar uma instrução SQL básica que criará a área de preparação:

CREATE STAGE my_int_stage
  ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
Copy

Certifique-se de selecionar o banco de dados, o esquema, a função e o warehouse corretos:

Banco de dados e esquema selecionados

Você também pode criar uma área de preparação interna diretamente na UI do Snowsight. Agora que a área de preparação existe, podemos carregar manualmente o arquivo de interesse nela. Navegue até a seção Databases da UI do Snowsight e encontre a área de preparação que acabamos de criar no database.schema apropriado:

Área de preparação localizada no esquema

Vamos adicionar nosso arquivo CSV selecionando a opção +Files no canto superior direito da janela. Isso iniciará o menu Upload Your Files:

Menu Upload Your Files

Arraste e solte ou navegue até o diretório do projeto e carregue o arquivo customer_update.csv na área de preparação:

Arquivo customer_update carregado

Selecione Upload no canto inferior direito da tela. Você será redirecionado para a tela da área de preparação. Para visualizar os arquivos, você precisará selecionar Enable Directory Table:

Botão Enable Directory Table

E agora… nosso arquivo aparece na área de preparação:

Arquivo carregado na área de preparação

Isso não é mais exatamente um pipeline, é claro. Mas pelo menos podemos executar o login no Snowflake. Execute o restante do código que você moveu para a planilha. Este usuário teve sucesso na primeira vez, mas isso não garante o sucesso na segunda:

Resultados da execução da consulta

Observe que, depois de definir essa função no Snowflake, você pode chamá-la de outras maneiras. Se o AdventureWorks está substituindo 100% o POS, pode fazer sentido ter a lógica de transformação no Snowflake, especialmente se a orquestração e a movimentação de arquivos forem tratadas em outro lugar. Isso permite que o Snowpark se concentre onde ele se destaca na lógica de transformação.

Conclusão

E isso é tudo para o arquivo de script. Não é o melhor exemplo de um pipeline, mas aborda de forma clara como lidar com a saída do SMA:

  • Resolver todos os problemas

  • Resolver as chamadas de sessão

  • Resolver as entradas/saídas

  • Limpar e testar!

Vamos passar para o notebook de relatórios.