Snowpark Migration Accelerator: 변환 작동 방식

Snowpark Migration Accelerator(SMA)는 코드에 대한 종합적인 평가를 생성할 뿐만 아니라 소스 코드의 특정 요소를 대상 코드베이스에 호환되는 형식으로 변환할 수도 있습니다. 이 변환 프로세스는 초기 평가와 동일한 단계를 따르며, 한 단계만 추가됩니다.

SMA 의 변환

평가 및 변환 모드 모두에서 Snowpark Migration Accelerator (SMA):

  • 지정된 디렉터리 내의 모든 파일을 검색합니다

  • 어떤 파일에 코드가 포함되어 있는지 감지합니다

  • 프로그래밍 언어에 따라 코드 파일을 분석합니다

  • 코드의 정형 표현을 생성합니다(추상 구문 트리 또는 AST)

  • 프로그램 정보로 기호 테이블을 생성하고 채웁니다

  • 발견된 오류를 식별하고 분류합니다

  • 결과에 대한 상세 보고서 작성

이전에 평가 모드에서 실행했더라도 변환 모드에서 SMA 를 실행하면 이러한 모든 프로세스가 반복됩니다. 하지만 변환 모드에는 최종 단계가 하나 더 있습니다.

  • 가독성을 높이기 위해 추상 구문 트리(AST)에서 생성된 코드의 형식을 지정합니다

추상 구문 트리(AST)는 소스 코드의 작동 방식을 나타내는 모델입니다. 소스 언어와 대상 언어 모두에 동일한 기능이 있는 경우 SMA 에서 대상 언어로 동등한 코드를 생성할 수 있습니다. 이 코드 생성은 실제 변환 프로세스 중에만 발생합니다.

SMA 의 변환 유형

Snowpark Migration Accelerator(SMA)는 현재 다음과 같은 코드 변환을 지원합니다.

  • Python 또는 Scala 코드를 Spark API 호출에서 동등한 Snowpark API 호출로 변환합니다

  • SQL 문을 Spark SQL 또는 HiveQL 에서 Snowflake SQL 구문으로 변환합니다

Scala와 Python 프로그래밍 언어로 작성된 예제를 살펴 보겠습니다.

Spark API 를 Snowpark API 로 변환하는 예

Spark Scala에서 Snowpark로의 예시

Scala를 소스 언어로 사용하는 경우, Snowpark Migration Accelerator(SMA)는 자동으로 Scala 코드의 Spark API 참조를 해당되는 Snowpark API 참조로 변환합니다. 다음은 기본 Spark 애플리케이션이 어떻게 변환되는지 보여주는 예시입니다. 예제 애플리케이션은 몇 가지 일반적인 데이터 작업을 수행합니다.

  • 데이터 읽기

  • 레코드 필터링하기

  • 데이터 세트 조인

  • 평균 계산하기

  • 결과 표시

Scala로 작성된 Apache Spark 코드

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.SparkSession 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: SparkSession, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                          // Calculate average salary for each job

// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()

```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Copy

Snowflake로 변환한 후의 코드:

import com.snowflake.snowpark._ 
import com.snowflake.snowpark.functions._ 
import com.snowflake.snowpark.Session 

object SimpleApp {
  // This function calculates the average salary for jobs in a specific department
  def avgJobSalary(session: Session, dept: String) {
    // Load employee data from CSV file
    val employees = session.read.csv("path/data/employees.csv")
    // Load job data from CSV file
    val jobs = session.read.csv("path/data/jobs.csv")

val jobsAvgSalary = employees
    .filter(column("Department") === dept)    // Filter employees by department
    .join(jobs)                              // Join with jobs table
    .groupBy("JobName")                      // Group results by job name
    .avg("Salary")                           // Calculate average salary per job

```scala
// Calculate and display all salaries in the department 
jobsAvgSalary.select(array_agg("Salary")).show()

// Display the average salary
jobsAvgSalary.show()
}
}
Copy

이 예제에서는 코드 구조가 크게 변경되지 않았습니다. 그러나 이 코드는 Spark API 참조 대신 Snowpark API 참조를 사용하도록 업데이트되었습니다.

PySpark 에서 Snowpark로 예시

소스 언어로 Python을 선택하면 SMA 는 자동으로 Python 코드의 PySpark API 호출을 해당하는 Snowpark API 호출로 변환합니다. 다음은 다양한 PySpark 함수를 보여주는 예제 스크립트입니다.

from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row

Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.

df = spark_session.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain() 
df.explain("simple") # Physical plan
df.explain(True) 

# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

Snowflake로 변환한 후의 코드:

from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row

Create a Spark session using the Session builder:

spark_session = Session.builder.create()

df = spark_session.create_dataframe([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

# cube()
df.cube("name", df.age).count().sort("name", "age").show()

# take()
df_new1.take(2)

# describe()
df.describe(['age']).show()

# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)

# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])

# where()
df_new1.where(F.col('Id2')>30).show()
Copy

이 예제에서는 코드 구조가 크게 변경되지 않았습니다. 그러나 Spark API 호출 대신 Snowpark API 호출을 사용하도록 코드가 업데이트되었습니다.

Snowpark Migration Accelerator(SMA)로 변환하는 과정에서 다음을 기대할 수 있습니다.