Snowpark Migration Accelerator: Wie die Konvertierung funktioniert

Der Snowpark Migration Accelerator (SMA) erstellt nicht nur eine umfassende Bewertung Ihres Codes, sondern kann auch bestimmte Elemente aus Ihrem Quellcode in kompatible Formate für Ihre Zielcodebasis konvertieren. Dieser Konvertierungsprozess folgt denselben Schritten wie die Erstbewertung, mit nur einem zusätzlichen Schritt.

Konvertierung im SMA

Sowohl im Bewertungs- als auch im Konvertierungsmodus tut der Snowpark Migration Accelerator (SMA) Folgendes:

  • Durchsucht alle Dateien innerhalb eines bestimmten Verzeichnisses

  • Erkennt, welche Dateien Code enthalten

  • Analysiert die Codedateien nach ihrer Programmiersprache

  • Erstellt eine strukturierte Darstellung des Codes (abstrakten Syntaxbaum oder AST)

  • Erstellt und befüllt eine Symboltabelle mit Programminformationen

  • Bezeichnet und klassifiziert alle gefundenen Fehler

  • Erstellt detaillierte Berichte über die Ergebnisse

Alle diese Prozesse werden wiederholt, wenn Sie SMA im Konvertierungsmodus ausführen, auch wenn Sie es zuvor im Bewertungsmodus ausgeführt haben. Der Konvertierungsmodus beinhaltet jedoch einen zusätzlichen letzten Schritt.

  • Formatiert den generierten Code aus dem abstrakten Syntaxbaum (AST ), um die Lesbarkeit zu verbessern

Der abstrakte Syntaxbaum (AST) ist ein Modell, das darstellt, wie Ihr Quellcode funktioniert. Wenn die gleiche Funktionalität sowohl in der Ausgangs- als auch in der Zielsprache vorhanden ist, kann SMA äquivalenten Code in der Zielsprache erzeugen. Diese Codegenerierung findet nur während des eigentlichen Konvertierungsprozesses statt.

Konvertierungstypen im SMA

Der Snowpark Migration Accelerator (SMA) unterstützt derzeit die folgenden Codekonvertierungen:

  • Konvertiert Python- oder Scala-Code von Spark API-Aufrufen in entsprechende Snowpark API-Aufrufe

  • Übersetzt SQL-Anweisungen von Spark SQL oder HiveQL in Snowflake SQL-Syntax

Schauen wir uns ein Beispiel an, das sowohl in Scala als auch in Python geschrieben wurde.

Beispiele für die Konvertierung von Verweisen auf der Spark API in die Snowpark API

Beispiel für Spark Scala zu Snowpark

Wenn Sie Scala als Quellsprache verwenden, konvertiert der Snowpark Migration Accelerator (SMA) automatisch die Spark API-Referenzen in Ihrem Scala Code in die entsprechenden Snowpark API-Referenzen. Nachfolgend finden Sie ein Beispiel, das zeigt, wie eine einfache Spark-Anwendung konvertiert wird. Die Beispielanwendung führt mehrere gängige Datenoperationen durch:

  • Lesen von Daten

  • Filtern von Datensätzen

  • Verknüpfen von Datensätzen

  • Berechnen von Durchschnittswerten

  • Anzeigen der Ergebnisse

Apache Spark-Code geschrieben in Scala

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SparkSession 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: SparkSession, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                          // Calculate average salary for each job

// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()

```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Copy

Der Code nach der Konvertierung in Snowflake:

import com.snowflake.snowpark._ 
import com.snowflake.snowpark.functions._ 
import com.snowflake.snowpark.Session 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: Session, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                           // Calculate average salary per job

```scala
// Calculate and display all salaries in the department 
jobsAvgSalary.select(array_agg("Salary")).show()

// Display the average salary
jobsAvgSalary.show()
}
}
Copy

In diesem Beispiel bleibt die Codestruktur weitgehend unverändert. Der Code wurde jedoch aktualisiert, um Snowpark API-Referenzen anstelle von Spark API-Referenzen zu verwenden.

Beispiel von PySpark zu Snowpark

Wenn Sie Python als Quellsprache wählen, konvertiert SMA automatisch PySpark API-Aufrufe in Ihrem Python-Code in die entsprechenden Snowpark API-Aufrufe. Im Folgenden finden Sie ein Beispielskript, das verschiedene Funktionen von PySpark demonstriert:

from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row

Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.

df = spark_session.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain() 
df.explain("simple") # Physical plan
df.explain(True) 

# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

Der Code nach der Konvertierung in Snowflake:

from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row

Create a Spark session using the Session builder:

spark_session = Session.builder.create()

df = spark_session.create_dataframe([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().sort("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)

# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

In diesem Beispiel bleibt die Codestruktur weitgehend unverändert. Der Code wurde jedoch aktualisiert, um Snowpark API-Aufrufe anstelle von Spark API-Aufrufen zu verwenden.

Während des Konvertierungsprozesses mit dem Snowpark Migration Accelerator (SMA) können Sie Folgendes erwarten: