Snowpark Migration Accelerator : Comment fonctionne la conversion¶
L’outil Snowpark Migration Accelerator (SMA) génère non seulement une évaluation complète de votre code mais peut également convertir des éléments spécifiques de votre code source dans des formats compatibles avec votre base de code cible. Ce processus de conversion suit les mêmes étapes que l’évaluation initiale, avec une seule étape supplémentaire.
Conversion dans l’outil SMA¶
En mode évaluation et en mode conversion, l’outil Snowpark Migration Accelerator (SMA) :
Recherche dans tous les fichiers d’un répertoire spécifié
Détecte les fichiers qui contiennent du code
Analyse les fichiers de code en fonction de leur langage de programmation
Crée une représentation structurée du code (arbre de la syntaxe abstraite ou AST)
Crée et remplit une table de symboles avec des informations sur le programme
Identifie et classe les erreurs détectées
Crée des rapports détaillés sur les résultats
Tous ces processus sont répétés lorsque vous exécutez SMA en mode conversion, même si vous l’avez précédemment exécuté en mode évaluation. Cependant, le mode de conversion comprend une étape finale supplémentaire.
Formater le code généré à partir de l’arbre de la syntaxe abstraite (AST) afin d’en améliorer la lisibilité
L’arbre de la syntaxe abstraite (AST) est un modèle qui représente le fonctionnement de votre code source. Lorsque la même fonctionnalité existe à la fois dans la langue source et dans la langue cible, SMA peut générer un code équivalent dans la langue cible. Cette génération de code n’a lieu que pendant le processus de conversion proprement dit.
Types de conversion dans l’outil SMA¶
L’outil Snowpark Migration Accelerator (SMA) prend actuellement en charge les conversions de code suivantes :
Convertit le code Python ou Scala des appels Spark API en appels Snowpark API équivalents
Traduit les instructions SQL de Spark SQL ou HiveQL en syntaxe SQL Snowflake
Examinons un exemple écrit dans les langages de programmation Scala et Python.
Exemples de conversion de références Spark API en références Snowpark API¶
Exemple de conversion de références Spark Scala en références Snowpark¶
Lorsque vous utilisez Scala comme langue source, l’outil Snowpark Migration Accelerator (SMA) convertit automatiquement les références Spark API dans votre code Scala en références Snowpark API équivalentes. Vous trouverez ci-dessous un exemple qui illustre comment une application Spark de base est convertie. L’exemple d’application effectue plusieurs opérations courantes sur les données :
Lecture des données
Filtrage des enregistrements
Jointure des ensembles de données
Calcul des moyennes
Affichage des résultats
Code Apache Spark écrit en Scala
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: SparkSession, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary for each job
// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()
```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Le code après la conversion en Snowflake :
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.Session
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: Session, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary per job
```scala
// Calculate and display all salaries in the department
jobsAvgSalary.select(array_agg("Salary")).show()
// Display the average salary
jobsAvgSalary.show()
}
}
Dans cet exemple, la structure du code reste largement inchangée. Toutefois, le code a été mis à jour pour utiliser les références Snowpark API au lieu des références Spark API.
Exemple de conversion d’appels PySpark en Snowpark¶
Lorsque vous choisissez Python comme langue source, SMA convertit automatiquement les appels API PySpark de votre code Python en leurs appels Snowpark API équivalents. Vous trouverez ci-dessous un exemple de script démontrant diverses fonctions PySpark :
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.
df = spark_session.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
Le code après la conversion en Snowflake :
from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row
Create a Spark session using the Session builder:
spark_session = Session.builder.create()
df = spark_session.create_dataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().sort("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
Dans cet exemple, la structure du code reste largement inchangée. Cependant, le code a été mis à jour pour utiliser les appels Snowpark API au lieu des appels Spark API.
Pendant le processus de conversion avec l’outil Snowpark Migration Accelerator (SMA), vous pouvez vous attendre à ce qui suit :