Snowpark Migration Accelerator: SQL 埋め込みコード

注釈

現在、 SMA は _ pyspark.sql _ 関数のみをサポートしています。

SMA は、PythonやScalaファイルに埋め込まれた SQL コードを変換できます。以下のファイル拡張子に埋め込まれた SQL コードを処理します。

  • Pythonソースコードファイル(.py拡張子あり)

  • Scalaソースコードファイル(.scala拡張子あり)

  • Jupyterノートブックファイル(.ipynb拡張子あり)

  • Databricksソースファイル(.pythonまたは.scala拡張子あり)

  • Databricksノートブックアーカイブファイル(.dbc拡張子あり)

埋め込み SQL コード変換サンプル

サポートされている場合

  • Pythonで spark.sql 関数を使用して SQL クエリを実行します。

# Original in Spark
spark.sql("""MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id1 = ps.person_id2)
WHEN NOT MATCHED BY SOURCE THEN DELETE""")
Copy
# SMA transformation
spark.sql("""MERGE INTO people_target pt
USING (
   SELECT
      pt.person_id1
   FROM
      people_target pt
      LEFT JOIN
         people_source ps
         ON pt.person_id1 = ps.person_id2
   WHERE
      ps.person_id2 IS NULL
) s_src
ON pt.person_id1 = s_src.person_id1
WHEN MATCHED THEN
   DELETE;""")
Copy

サポートされていない場合

When SMA encounters code that it cannot convert, it generates an Error, Warning, and Issue (EWI) message in the output code. For more details about these messages, see EWI.

以下のシナリオは現在サポートされていません。

  • SQL のコードを扱う場合は、文字列変数を次のように組み込むことができます。

query = "SELECT COUNT(COUNTRIES) FROM SALES"
dfSales = spark.sql(query)
Copy
query = "SELECT COUNT(COUNTRIES) FROM SALES" 
#EWI: SPRKPY1077 => SQL embedded code cannot be processed. 
dfSales = spark.sql(query)
Copy
  • 文字列を結合し、単純な連結を使用して SQL コードを構築します。

base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"

df1 = spark.sql(bas + criteria_1 + fromClause)
df2 = spark.sql(bas + criteria_2 + fromClause)
Copy
base = "SELECT "
criteria_1 = " COUNT(*) "
criteria_2 = " * "
fromClause = " FROM COUNTRIES"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.

df1 = spark.sql(bas + criteria_1 + fromClause)
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
df2 = spark.sql(bas + criteria_2 + fromClause)
Copy
  • 文字列補間を使用して、 SQL ステートメントを動的に生成します。

# Old Style interpolation
UStbl = "SALES_US"
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))

# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))

# New Style
UKTbl = " UK_SALES_JUN_18" 
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
Copy
# Old Style interpolation
UStbl = "SALES_US"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUS = spark.sql("SELECT * FROM %s" % (UStbl))

# Using format function
COLtbl = "COL_SALES WHERE YEAR(saleDate) > 2023"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesCol = spark.sql("SELECT * FROM {}".format(COLtbl))

# New Style
UKTbl = " UK_SALES_JUN_18"
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
salesUk = spark.sql(f"SELECT * FROM {UKTbl}")
Copy
  • SQL クエリを動的に生成する関数を使用します。

def ByMonth(month):
    query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
    return spark.sql(query)
Copy
def ByMonth(month):
query = f"SELECT * LOGS WHERE MONTH(access_date) = {month}"
    #EWI: SPRKPY1077 => SQL embedded code cannot be processed.
    return spark.sql(query)
Copy

サポートされていない場合と EWI メッセージ

  • Scalaコードを分析する際のエラーコード SPRKSCL1173 は、サポートされていない埋め込み SQL ステートメントを示します。

/*Scala*/
 class SparkSqlExample {
    def main(spark: SparkSession) : Unit = {
    /*EWI: SPRKSCL1173 => SQL embedded code cannot be processed.*/
    spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")
    }
Copy
  • Pythonコードに、サポートされていない埋め込み SQL ステートメントが含まれる場合は、エラーコード SPRKPY1077 が表示されます。

# Python Output 
#EWI: SPRKPY1077 => SQL embedded code cannot be processed.
b = spark.sql("CREATE VIEW IF EXISTS My View AS Select * From my Table WHERE date < current_date() ")
Copy