Snowpark Migration Accelerator: Como funciona a conversão¶
O Snowpark Migration Accelerator (SMA) não apenas gera uma avaliação abrangente do seu código, mas também pode converter elementos específicos do seu código-fonte em formatos compatíveis com a base de código de destino. Esse processo de conversão segue as mesmas etapas da avaliação inicial, com apenas uma etapa adicional.
Conversão no SMA¶
Nos modos de avaliação e conversão, o Snowpark Migration Accelerator (SMA):
Pesquisa todos os arquivos em um diretório especificado
Detecta quais arquivos contêm código
Analisa os arquivos de código de acordo com sua linguagem de programação
Cria uma representação estruturada do código (Abstract Syntax Tree ou AST)
Cria e preenche uma Symbol Table com informações do programa
Identifica e classifica os erros encontrados
Cria relatórios detalhados dos resultados
Todos esses processos são repetidos quando você executa o SMA no modo de conversão, mesmo que o tenha executado anteriormente no modo de avaliação. No entanto, o modo de conversão inclui uma etapa final adicional.
Formatar o código gerado a partir da árvore de sintaxe abstrata (AST) para melhorar a legibilidade
A Árvore de sintaxe abstrata (AST) é um modelo que representa como o código-fonte funciona. Quando a mesma funcionalidade existe nas linguagens de origem e de destino, o SMA pode gerar um código equivalente no idioma de destino. Essa geração de código só acontece durante o processo de conversão real.
Tipos de conversão no SMA¶
O Snowpark Migration Accelerator (SMA) atualmente oferece suporte às seguintes conversões de código:
Converte o código Python ou Scala das chamadas do Spark API em chamadas equivalentes do Snowpark API
Converte as instruções SQL do Spark SQL ou HiveQL para a sintaxe do Snowflake SQL
Vamos examinar um exemplo escrito nas linguagens de programação Scala e Python.
Exemplos de conversão de referências ao Spark API para o Snowpark API¶
Exemplo de Spark Scala para Snowpark¶
Ao usar Scala como linguagem de origem, o Snowpark Migration Accelerator (SMA) converte automaticamente as referências Spark API em seu código Scala para as referências equivalentes do Snowpark API. Abaixo está um exemplo que demonstra como um aplicativo Spark básico é convertido. O aplicativo de exemplo executa várias operações de dados comuns:
Leitura de dados
Filtragem de registros
Unir conjuntos de dados
Cálculo de médias
Exibição de resultados
Código do Apache Spark escrito em Scala
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: SparkSession, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary for each job
// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()
```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
O código após a conversão para Snowflake:
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.Session
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: Session, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary per job
```scala
// Calculate and display all salaries in the department
jobsAvgSalary.select(array_agg("Salary")).show()
// Display the average salary
jobsAvgSalary.show()
}
}
Neste exemplo, a estrutura do código permanece praticamente inalterada. No entanto, o código foi atualizado para usar as referências do Snowpark API em vez das referências do Spark API.
Exemplo de PySpark para Snowpark¶
Quando você escolhe o Python como linguagem de origem, o SMA converte automaticamente as chamadas PySpark API no código Python para as chamadas API equivalentes do Snowpark. Abaixo está um script de exemplo que demonstra várias funções do PySpark:
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.
df = spark_session.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
O código após a conversão para Snowflake:
from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row
Create a Spark session using the Session builder:
spark_session = Session.builder.create()
df = spark_session.create_dataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().sort("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
Neste exemplo, a estrutura do código permanece praticamente inalterada. No entanto, o código foi atualizado para usar as chamadas do Snowpark API em vez das chamadas do Spark API.
Durante o processo de conversão com o Snowpark Migration Accelerator (SMA), você pode esperar o seguinte: