Snowpark Migration Accelerator: How the Conversion Works¶
The Snowpark Migration Accelerator (SMA) not only generates a comprehensive assessment of your code but can also convert specific elements from your source code into compatible formats for your target codebase. This conversion process follows the same steps as the initial assessment, with just one additional step.
Conversion in the SMA¶
In both assessment and conversion modes, the Snowpark Migration Accelerator (SMA):
Searches through all files within a specified directory
Detects which files contain code
Analyzes the code files according to their programming language
Creates a structured representation of the code (Abstract Syntax Tree or AST)
Creates and fills a Symbol Table with program information
Identifies and classifies any errors found
Creates detailed reports of the results
All of these processes are repeated when you run SMA in conversion mode, even if you previously ran it in assessment mode. However, conversion mode includes one additional final step.
Format the generated code from the Abstract Syntax Tree (AST) to improve readability
The Abstract Syntax Tree (AST) is a model that represents how your source code works. When the same functionality exists in both the source and target languages, SMA can generate equivalent code in the target language. This code generation only happens during the actual conversion process.
Types of Conversion in the SMA¶
The Snowpark Migration Accelerator (SMA) currently supports the following code conversions:
Converts Python or Scala code from Spark API calls to equivalent Snowpark API calls
Translates SQL statements from Spark SQL or HiveQL to Snowflake SQL syntax
Let’s examine an example written in both Scala and Python programming languages.
Examples of Conversion of References to the Spark API to the Snowpark API¶
Example of Spark Scala to Snowpark¶
When using Scala as your source language, the Snowpark Migration Accelerator (SMA) automatically converts Spark API references in your Scala code to their equivalent Snowpark API references. Below is an example that demonstrates how a basic Spark application is converted. The example application performs several common data operations:
Reading data
Filtering records
Joining datasets
Calculating averages
Displaying results
Apache Spark Code Written in Scala
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: SparkSession, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary for each job
// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()
```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
The Code After Conversion to Snowflake:
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.Session
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: Session, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary per job
```scala
// Calculate and display all salaries in the department
jobsAvgSalary.select(array_agg("Salary")).show()
// Display the average salary
jobsAvgSalary.show()
}
}
In this example, the code structure remains largely unchanged. However, the code has been updated to use Snowpark API references instead of Spark API references.
Example of PySpark to Snowpark¶
When you choose Python as your source language, SMA automatically converts PySpark API calls in your Python code to their equivalent Snowpark API calls. Below is an example script that demonstrates various PySpark functions:
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.
df = spark_session.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
The Code After Conversion to Snowflake:
from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row
Create a Spark session using the Session builder:
spark_session = Session.builder.create()
df = spark_session.create_dataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().sort("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
In this example, the code structure remains largely unchanged. However, the code has been updated to use Snowpark API calls instead of Spark API calls.
During the conversion process with the Snowpark Migration Accelerator (SMA), you can expect the following: