Snowpark Migration Accelerator: Como funciona a conversão

O Snowpark Migration Accelerator (SMA) não apenas gera uma avaliação abrangente do seu código, mas também pode converter elementos específicos do seu código-fonte em formatos compatíveis com a base de código de destino. Esse processo de conversão segue as mesmas etapas da avaliação inicial, com apenas uma etapa adicional.

Conversão no SMA

Nos modos de avaliação e conversão, o Snowpark Migration Accelerator (SMA):

  • Pesquisa todos os arquivos em um diretório especificado

  • Detecta quais arquivos contêm código

  • Analisa os arquivos de código de acordo com sua linguagem de programação

  • Cria uma representação estruturada do código (Abstract Syntax Tree ou AST)

  • Cria e preenche uma Symbol Table com informações do programa

  • Identifica e classifica os erros encontrados

  • Cria relatórios detalhados dos resultados

Todos esses processos são repetidos quando você executa o SMA no modo de conversão, mesmo que o tenha executado anteriormente no modo de avaliação. No entanto, o modo de conversão inclui uma etapa final adicional.

  • Formatar o código gerado a partir da árvore de sintaxe abstrata (AST) para melhorar a legibilidade

A Árvore de sintaxe abstrata (AST) é um modelo que representa como o código-fonte funciona. Quando a mesma funcionalidade existe nas linguagens de origem e de destino, o SMA pode gerar um código equivalente no idioma de destino. Essa geração de código só acontece durante o processo de conversão real.

Tipos de conversão no SMA

O Snowpark Migration Accelerator (SMA) atualmente oferece suporte às seguintes conversões de código:

  • Converte o código Python ou Scala das chamadas do Spark API em chamadas equivalentes do Snowpark API

  • Converte as instruções SQL do Spark SQL ou HiveQL para a sintaxe do Snowflake SQL

Vamos examinar um exemplo escrito nas linguagens de programação Scala e Python.

Exemplos de conversão de referências ao Spark API para o Snowpark API

Exemplo de Spark Scala para Snowpark

Ao usar Scala como linguagem de origem, o Snowpark Migration Accelerator (SMA) converte automaticamente as referências Spark API em seu código Scala para as referências equivalentes do Snowpark API. Abaixo está um exemplo que demonstra como um aplicativo Spark básico é convertido. O aplicativo de exemplo executa várias operações de dados comuns:

  • Leitura de dados

  • Filtragem de registros

  • Unir conjuntos de dados

  • Cálculo de médias

  • Exibição de resultados

Código do Apache Spark escrito em Scala

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SparkSession 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: SparkSession, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                          // Calculate average salary for each job

// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()

```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Copy

O código após a conversão para Snowflake:

import com.snowflake.snowpark._ 
import com.snowflake.snowpark.functions._ 
import com.snowflake.snowpark.Session 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: Session, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                           // Calculate average salary per job

```scala
// Calculate and display all salaries in the department 
jobsAvgSalary.select(array_agg("Salary")).show()

// Display the average salary
jobsAvgSalary.show()
}
}
Copy

Neste exemplo, a estrutura do código permanece praticamente inalterada. No entanto, o código foi atualizado para usar as referências do Snowpark API em vez das referências do Spark API.

Exemplo de PySpark para Snowpark

Quando você escolhe o Python como linguagem de origem, o SMA converte automaticamente as chamadas PySpark API no código Python para as chamadas API equivalentes do Snowpark. Abaixo está um script de exemplo que demonstra várias funções do PySpark:

from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row

Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.

df = spark_session.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain() 
df.explain("simple") # Physical plan
df.explain(True) 

# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

O código após a conversão para Snowflake:

from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row

Create a Spark session using the Session builder:

spark_session = Session.builder.create()

df = spark_session.create_dataframe([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().sort("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)

# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

Neste exemplo, a estrutura do código permanece praticamente inalterada. No entanto, o código foi atualizado para usar as chamadas do Snowpark API em vez das chamadas do Spark API.

Durante o processo de conversão com o Snowpark Migration Accelerator (SMA), você pode esperar o seguinte: