Snowpark Migration Accelerator: Wie die Konvertierung funktioniert¶
Der Snowpark Migration Accelerator (SMA) erstellt nicht nur eine umfassende Bewertung Ihres Codes, sondern kann auch bestimmte Elemente aus Ihrem Quellcode in kompatible Formate für Ihre Zielcodebasis konvertieren. Dieser Konvertierungsprozess folgt denselben Schritten wie die Erstbewertung, mit nur einem zusätzlichen Schritt.
Konvertierung im SMA¶
Sowohl im Bewertungs- als auch im Konvertierungsmodus tut der Snowpark Migration Accelerator (SMA) Folgendes:
Durchsucht alle Dateien innerhalb eines bestimmten Verzeichnisses
Erkennt, welche Dateien Code enthalten
Analysiert die Codedateien nach ihrer Programmiersprache
Erstellt eine strukturierte Darstellung des Codes (abstrakten Syntaxbaum oder AST)
Erstellt und befüllt eine Symboltabelle mit Programminformationen
Bezeichnet und klassifiziert alle gefundenen Fehler
Erstellt detaillierte Berichte über die Ergebnisse
Alle diese Prozesse werden wiederholt, wenn Sie SMA im Konvertierungsmodus ausführen, auch wenn Sie es zuvor im Bewertungsmodus ausgeführt haben. Der Konvertierungsmodus beinhaltet jedoch einen zusätzlichen letzten Schritt.
Formatiert den generierten Code aus dem abstrakten Syntaxbaum (AST ), um die Lesbarkeit zu verbessern
Der abstrakte Syntaxbaum (AST) ist ein Modell, das darstellt, wie Ihr Quellcode funktioniert. Wenn die gleiche Funktionalität sowohl in der Ausgangs- als auch in der Zielsprache vorhanden ist, kann SMA äquivalenten Code in der Zielsprache erzeugen. Diese Codegenerierung findet nur während des eigentlichen Konvertierungsprozesses statt.
Konvertierungstypen im SMA¶
Der Snowpark Migration Accelerator (SMA) unterstützt derzeit die folgenden Codekonvertierungen:
Konvertiert Python- oder Scala-Code von Spark API-Aufrufen in entsprechende Snowpark API-Aufrufe
Übersetzt SQL-Anweisungen von Spark SQL oder HiveQL in Snowflake SQL-Syntax
Schauen wir uns ein Beispiel an, das sowohl in Scala als auch in Python geschrieben wurde.
Beispiele für die Konvertierung von Verweisen auf der Spark API in die Snowpark API¶
Beispiel für Spark Scala zu Snowpark¶
Wenn Sie Scala als Quellsprache verwenden, konvertiert der Snowpark Migration Accelerator (SMA) automatisch die Spark API-Referenzen in Ihrem Scala Code in die entsprechenden Snowpark API-Referenzen. Nachfolgend finden Sie ein Beispiel, das zeigt, wie eine einfache Spark-Anwendung konvertiert wird. Die Beispielanwendung führt mehrere gängige Datenoperationen durch:
Lesen von Daten
Filtern von Datensätzen
Verknüpfen von Datensätzen
Berechnen von Durchschnittswerten
Anzeigen der Ergebnisse
Apache Spark-Code geschrieben in Scala
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: SparkSession, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary for each job
// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()
```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Der Code nach der Konvertierung in Snowflake:
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.Session
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: Session, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary per job
```scala
// Calculate and display all salaries in the department
jobsAvgSalary.select(array_agg("Salary")).show()
// Display the average salary
jobsAvgSalary.show()
}
}
In diesem Beispiel bleibt die Codestruktur weitgehend unverändert. Der Code wurde jedoch aktualisiert, um Snowpark API-Referenzen anstelle von Spark API-Referenzen zu verwenden.
Beispiel von PySpark zu Snowpark¶
Wenn Sie Python als Quellsprache wählen, konvertiert SMA automatisch PySpark API-Aufrufe in Ihrem Python-Code in die entsprechenden Snowpark API-Aufrufe. Im Folgenden finden Sie ein Beispielskript, das verschiedene Funktionen von PySpark demonstriert:
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.
df = spark_session.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
Der Code nach der Konvertierung in Snowflake:
from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row
Create a Spark session using the Session builder:
spark_session = Session.builder.create()
df = spark_session.create_dataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().sort("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
In diesem Beispiel bleibt die Codestruktur weitgehend unverändert. Der Code wurde jedoch aktualisiert, um Snowpark API-Aufrufe anstelle von Spark API-Aufrufen zu verwenden.
Während des Konvertierungsprozesses mit dem Snowpark Migration Accelerator (SMA) können Sie Folgendes erwarten: