Snowpark Migration Accelerator: 変換の仕組み¶
Snowpark Migration Accelerator(SMA)は、コードの包括的な評価を生成するだけでなく、ソースコードの特定の要素をターゲットコードベースに対応する形式に変換することもできます。この変換プロセスは、最初の評価と同じステップを踏みますが、1つだけ追加のステップがあります。
SMA での変換¶
評価モードと変換モードの両方で、Snowpark Migration Accelerator(SMA)は以下を実行します。
指定したディレクトリ内のすべてのファイルを検索する
コードを含むファイルを検出する
プログラミング言語に応じてコードファイルを分析する
コードの構造化表現(抽象構文ツリーまたは AST)を作成する
記号テーブルを作成し、プログラム情報を入力する
発見されたエラーを識別し、分類する
結果の詳細なレポートを作成する
SMA を変換モードで実行すると、以前に評価モードで実行した場合でも、これらのプロセスがすべて繰り返されます。しかし、変換モードには最後のステップが1つ追加されます。
抽象構文ツリー(AST)から生成されたコードを形式化し、読みやすさを向上する
抽象構文ツリー(AST)は、ソースコードがどのように動作するかを表すモデルです。ソース言語とターゲット言語の両方に同じ関数が存在する場合、 SMA は、ターゲット言語で同等のコードを生成できます。このコード生成は、実際の変換処理中にのみ行われます。
SMA での変換のタイプ¶
Snowpark Migration Accelerator(SMA)は現在、以下のコード変換をサポートしています。
PythonまたはScalaコードをSpark API 呼び出しから同等のSnowpark API 呼び出しに変換する
SQL ステートメントをSpark SQL または HiveQL からSnowflake SQL 構文に翻訳する
ScalaとPythonの両言語で書かれた例を見てみましょう。
Spark API からSnowpark API の参照の変換例¶
Spark ScalarからSnowparkの例¶
ソース言語としてScalaを使用している場合、Snowpark Migration Accelerator(SMA)は、Scalaコード内のSpark API 参照を同等のSnowpark API 参照に自動的に変換します。以下は、基本的なSparkアプリケーションがどのように変換されるかを示す例です。サンプルアプリケーションは、いくつかの一般的なデータ操作を実行します。
データの読み込み
記録のフィルター
データセットの結合
平均値の計算
結果の表示
Scalaで書かれたApache Sparkコード
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: SparkSession, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary for each job
// Calculate and display a list of all salaries in the department
jobsAvgSalary.select(collect_list("Salary")).show()
```scala
// Calculate and display the average salary
jobsAvgSalary.show()
}
Snowflakeへの変換後のコード:
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
import com.snowflake.snowpark.Session
object SimpleApp {
// This function calculates the average salary for jobs in a specific department
def avgJobSalary(session: Session, dept: String) {
// Load employee data from CSV file
val employees = session.read.csv("path/data/employees.csv")
// Load job data from CSV file
val jobs = session.read.csv("path/data/jobs.csv")
val jobsAvgSalary = employees
.filter(column("Department") === dept) // Filter employees by department
.join(jobs) // Join with jobs table
.groupBy("JobName") // Group results by job name
.avg("Salary") // Calculate average salary per job
```scala
// Calculate and display all salaries in the department
jobsAvgSalary.select(array_agg("Salary")).show()
// Display the average salary
jobsAvgSalary.show()
}
}
この例では、コード構造はほとんど変わっていません。しかし、コードはSpark API 参照の代わりにSnowpark API 参照を使用するように更新されました。
PySpark からSnowparkの例¶
ソース言語としてPythonを選択すると、 SMA は、Pythonコード内の PySpark API 呼び出しを同等のSnowpark API 呼び出しに自動的に変換します。以下は、 PySpark のさまざまな関数を示すスクリプトの例です。
from datetime import date, datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Row
Create a Spark session by building and initializing a new SparkSession object, or retrieve an existing one if already available.
df = spark_session.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().orderBy("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
Snowflakeへの変換後のコード:
from datetime import date, datetime
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import Row
Create a Spark session using the Session builder:
spark_session = Session.builder.create()
df = spark_session.create_dataframe([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
# cube()
df.cube("name", df.age).count().sort("name", "age").show()
# take()
df_new1.take(2)
# describe()
df.describe(['age']).show()
# explain()
df.explain()
df.explain("simple") # Physical plan
df.explain(True)
# intersect()
df1 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
df2 = spark_session.create_dataframe([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
# where()
df_new1.where(F.col('Id2')>30).show()
この例では、コード構造はほとんど変わっていません。しかし、コードはSpark API 呼び出しの代わりにSnowpark API 呼び出しを使用するように更新されました。
Snowpark Migration Accelerator(SMA)を使用した変換プロセスでは、次を期待できます。