Snowpark Migration Accelerator:パイプラインの変換¶
SMAがスクリプトを「変換」しましたが、本当にそうでしょうか。実際に行ったのは、Spark APIからSnowpark APIへのすべての参照の変換です。行っていないことは、パイプライン内に存在する可能性のある接続の置き換えです。
SMAの利点は、その評価報告にあります。変換が、Spark APIからSnowpark APIへの参照の変換に結びついているからです。これらの参照の変換だけでは、データパイプラインを実行することはできないことに注意してください。パイプラインの接続が手動で解決されるようにする必要があります。SMAは、接続パラメーターや、それを通して実行できない可能性が高いその他の要素を推測して知ることはできません。
どんな変換でも同じですが、変換されたコードを扱うにはさまざまな方法があります。以下のステップは、変換ツールの出力にどのようにアプローチするかを 推奨 するものです。SnowConvertのように、SMAは出力に注意を払う必要があります。変換が100%自動化されることはありません。これは特にSMAに当てはまります。SMAはSpark APIからSnowpark APIに参照を変換しているので、それらの参照がどのように実行されているかを常にチェックする必要があります。また、スクリプトやノートブックが正常に実行されるように調整を試みません。
そこで、以下の手順に従って、SnowConvertとは若干異なるSMAの出力を処理することになります。
すべての問題の解決 :ここでの「問題」とは、SMAによって発生した問題を意味します。出力コードを見てみましょう。解析エラーや変換エラーを解決し、警告を調査します。
セッションコールの解決 :セッションコールをどのように出力コードに書くかは、ファイルを実行する場所によって異なります。コードファイルを元々実行しようとしていたのと同じ場所で実行し、その後Snowflakeで実行することで、この問題を解決します。
入出力の解決 :異なるソースへの接続は、SMAで完全に解決することはできません。プラットフォームには違いがあり、SMAは通常それらを無視します。これは、ファイルを実行する場所にも影響されます。
クリーンアップとテスト :コードを実行してみます。うまくいくかどうか見てみます。このラボではスモークテストを行いますが、Snowpark Python Checkpointsなど、より広範なテストやデータ検証を行うツールもあります。
では、これがどのようなものか見てみましょう。2つのアプローチを行います。最初のアプローチは、(ソーススクリプトが実行されている)ローカルマシンのPythonでこれを実行することです。もう1つは、すべてをSnowflakeのSnowsightで行うことですが、ローカルソースから読み込むデータパイプラインの場合、Snowsightで完全にできるわけではありません。でも大丈夫です。今回のPOCでは、このスクリプトのオーケストレーションは変換していません。
パイプラインスクリプトファイルから始めて、次のセクションでノートブックに取りかかります。
問題を解決する¶
ソースと出力コードをコードエディターで開きます。お好きなコードエディタを使用できますが、何度も述べたように、Snowflakeでは VS CodeとSnowflake拡張機能 を使用することをお勧めします。Snowflake拡張機能は、SnowConvertから問題をナビゲートしてくれるだけでなく、Pythonの Snowpark Checkpoints を実行することもできます。これは、テストと根本原因分析に役立ちます(このラボの範囲外ですが)。
プロジェクト作成画面(Spark ADW Lab)で作成した元のディレクトリを、VS Codeで開きます。

なお、Output ディレクトリ構造は入力ディレクトリと同じになります。変換が行われていないにもかかわらず、データファイルもコピーされます。また、 checkpoints.json ファイルがSMAでいくつか作成されます。これらは、Snowpark Checkpoints拡張機能の指示を含むjsonファイルです。Snowflake拡張機能は、これらのファイルのデータに基づいて、ソースコードと出力コードの両方にチェックポイントをロードすることができます。今は無視します。
最後に、入力されたpythonスクリプトと出力スクリプトで変換されたものを比較してみます。

これは、基本的な対象比較で、左がオリジナルのSparkコードで、右が出力されたSnowpark互換コードです。いくつかのインポートがセッションコールと同様に変換されたようです。上の画像の下の方にEWIがありますが、それは脇に置きます。他のことをする前に、解析エラーを見つける必要があります。
UIとissues.csvの両方で表示された解析エラーのエラーコードをドキュメントから検索することができます( SPRKPY1101 )。

結果をフィルタリングしていないため、 issues.csv にあるこのエラーコードのリストは、検索と AssessmentReport.docx 要約評価レポートの作成に使用される AssessmentReport.json にも表示されます。これは、ユーザーが大規模なワークロードを理解するためにナビゲートするメインレポートですが、このラボでは見ていません。( このレポートの詳細については、SMAのドキュメントを参照してください )。このEWIが、上のように pipeline_dimcustomer.py ファイルのどこに表示されるかを選択しましょう。
この行がソースコードの一番下にあったことがわかります。
# Conversion Input.
some rogue code that doesn't make any sense!
# Conversion Output.
some
# EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(131, 6). Last valid token was 'some' @(131, 1), failed token 'rogue' @(131, 6)
# rogue code that doesn't make any sense!
どうやらこの解析エラーの原因は、「意味のない不正なコード」だったようです。このコード行はパイプラインファイルの一番下にあります。ソースからの抽出の一部として、コードファイルに余分な文字やその他の要素があることは珍しいことではありません。SMAはこれが有効なPythonコードでないことを検出し、解析エラーを生成したことに注意してください。
また、SMAがエラーが発生した箇所にエラーコードと説明文の両方をコメントとして出力コードに挿入しているのもわかります。すべてのエラーメッセージはこのように出力されます。
これは有効なコードではなく、ファイルの末尾にあり、このエラーの結果削除されたものは他に何もないので、元のコードとコメントは出力コードファイルから安全に削除できます。
これで、最初の、そして最も深刻な問題を解決しました。喜びましょう。
このファイルの残りのEWIsに取り組みます。エラーコードが発生するたびに、そのテキストがコメントに表示されることがわかっているので、「EWI」を検索することができます。(別の方法として、issue.csvファイルをソートし、深刻度順に並べることもできますが、ここではその必要はありません)。
次はエラーではなく警告です。SparkとSnowparkでは必ずしも同等ではない関数が使われていたことを教えてくれています。
#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
split_col = split(df_uppercase['NAME'], '.first:')
しかし、この説明を読む限り、おそらく心配する必要はありません。渡されるパラメーターは2つだけです。このEWIはファイル内にコメントとして残しておきましょう。後でファイルを実行するときにチェックすることができます。
このファイルの最後のものは、何かがサポートされていないということを示す変換エラーです。

これは、出力データフレームをSQL Serverに書き込むためのspark jdbcドライバーへの書き込みコールです。これは「すべての入出力の解決」ステップの一部であり、問題を解決した後に処理することになるので、後回しにしておきます。ただし、このエラーは解決する必要があることに注意してください。前のものは単なる警告であり、変更を加えなくても機能する可能性があります。
セッションコールの解決¶
セッションコールはSMAによって変換されますが、機能するかどうかを確認するために、特に注意を払う必要があります。パイプラインスクリプトでは、これが前と後のコードです。

SparkSession参照は、Sessionに変更されています。このファイルのインポートステートメントでも、先頭付近で参照が変更されているのがわかります。

上の画像では、セッションコールの「spark」への変数割り当ては変更されていません。これは変数割り当てだからです。これを変更する必要はありませんが、もし「spark」デコレーターを「session」に変更したいのであれば、その方がSnowparkが推奨しているものに近くなります。(なお、VS Code Extensionの「SMA Assistant」は、これらの変更も提案してくれます)。
これは簡単な演習ですが、やる価値はあります。VS Codeの検索機能を使って、このファイル内の「spark」を検索し、セッションに置き換えることができます。その結果が下の画像です。変換後のコードで「spark」変数への参照が「session」に置き換えられています。

また、このセッションコールから別のものを取り除くこともできます。もう「spark」を実行するつもりはないので、sparkドライバーのドライバーパスを指定する必要はありません。そこで、次のようにセッションコールからconfig関数を完全に取り除くことができます。
# Old Converted output.
# Spark Session
session = Session.builder.config('spark.driver.extraClassPath', driver_path) \
.app_name('SparkSQLServerExample', True) \
.getOrCreate()
# New Converted Output
# Snowpark Session
session = Session.builder.app_name('SparkSQLServerExample', True).getOrCreate()
単一の行に変換した方がいいかもしれません。SMAは、そのドライバーを必要としていないと確信できなかったので(論理的にはそうみえますが)、ドライバーを削除しませんでした。しかし、これでセッションコールは完了しました。
(SMAは、セッションに「クエリタグ」も追加することに注意してください。これは、このセッションやクエリに関する問題のトラブルシューティングに役立てるためですが、残すか削除するかは完全に任意です)。
セッションコールに関するメモ¶
信じられないかもしれませんが、セッションコールのコードで変更する必要があるのはこれだけです。ただし、セッションを作成するためのすべてではありません。最初の質問に戻りますが、大部分はこれらのファイルをどこで実行したいかに依存します。これらの元のSparkセッションコールは、別の場所で設定された構成を使用していました。元のSparkセッションコールを見ると、このスクリプトファイルの最初にあるpandas dataframeの場所に読み込まれる構成ファイルを探しています(これは実際にノートブックファイルにも当てはまります)。

Snowparkも同じように機能します。この変換は、このユーザーがこのコードをどのように実行するかを想定しています。しかし、既存のセッションコールを機能させるためには、ユーザーは自分のSnowflakeアカウントのすべての情報をこのマシンのローカル(または少なくともアクセス可能な)connection.tomlファイルにロードし、接続しようとしているアカウントがデフォルトとして設定されている必要があります。 connections.tomlファイルの更新の詳細についてはSnowflake/Snowparkドキュメント で説明されていますが、その背景にある考え方は、資格情報を持つアクセス可能な場所があるということです。snowparkのセッションが作成されると、接続パラメーターが明示的にセッションコールに渡されない限り、これをチェックします。
これを行う標準的な方法は、接続パラメーターを文字列として直接入力し、セッションで呼び出すことです。
# Parameters in a dictionary.
connection_parameters = {
"account": "<your snowflake account>",
"user": "<your snowflake user>",
"password": "<your snowflake password>",
"role": "<your snowflake role>", # optional
"warehouse": "<your snowflake warehouse>", # optional
"database": "<your snowflake database>", # optional
"schema": "<your snowflake schema>", # optional
}
# The session call
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()
AdventureWorksは、これらの認証情報を持つファイルを参照し、それを呼び出したようです。「snowflake_credentials.txt」という同様のファイルがあり、アクセス可能だと仮定すると、それにマッチする構文は以下のようになります。
# Load into a dataframe.
snow_creds = pd.read_csv('snowflake_credentials.txt', index_col=None, header=0)
# Build the parameters.
connection_parameters = {
"account": snow_creds.loc[snow_creds['Specific_Element'] == 'Account', 'Value'].item(),
"user": snow_creds.loc[snow_creds['Specific_Element'] == 'Username', 'Value'].item(),
"password": snow_creds.loc[snow_creds['Specific_Element'] == 'Password', 'Value'].item(),
"role": "<your snowflake role>", # optional
"warehouse": snow_creds.loc[snow_creds['Specific_Element'] == 'Warehouse', 'Value'].item(), # optional
"database": snow_creds.loc[snow_creds['Specific_Element'] == 'Database', 'Value'].item(), # optional
"schema": snow_creds.loc[snow_creds['Specific_Element'] == 'Schema', 'Value'].item(), # optional
}
# Then pass the parameters to the configs function of the session builder.
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()
このラボの制限時間を考えると、最初のオプションの方が理にかなっているかもしれません。 これについてはSnowparkのドキュメントに詳細があります 。
Snowsightを使ってSnowflakeの中でノートブックファイルを実行する場合は、このような作業は必要ないことに注意してください。アクティブなセッションを呼び出して実行するだけです。
さて、いよいよこの移行で最も重要な要素、入出力参照の解決です。
入出力の解決¶
では、入出力を解決しましょう。Pythonスクリプトの場合、Snowsightの内部で直接実行することで何が得られるかまたは失われるかを確認しましょう。 Snowsightで全ての操作を実行することはできません (少なくとも現在は)。ローカルcsvファイルはSnowsightからアクセスできません。.csvファイルを手動でステージにロードする必要があります。これは理想的な解決策ではないでしょうが、こうすることで変換をテストすることができます。
そこで、まずこのファイルをローカルで実行/オーケストレーションできるように準備し、次にSnowflakeで実行できるようにします。
パイプラインスクリプトの入力と出力を解決するには、まずそれらを特定する必要があります。とてもシンプルです。このスクリプトは以下のようです。
ローカルファイルにアクセスする
結果をSQL Server(ただし現在はSnowflake)にロードする
次のファイルのためにファイルを移動する
十分にシンプルです。そのため、そのようなことをするコードの各コンポーネントを置き換える必要があります。ローカルファイルへのアクセスから始めます。
冒頭で述べたように、販売時点情報管理システムと、このPythonスクリプトを実行するために使用されるオーケストレーションツールを再設計し、出力ファイルをクラウドストレージの場所に置くことを強くお勧めします。そして、その場所を外部テーブルに変えれば、Snowflakeに入れることができます。しかし、現在のアーキテクチャでは、このファイルはクラウドストレージにはなく、そのままの場所にあります。そのため、Snowflakeが既存のロジックを保持するこのファイルにアクセスできるようにする必要があります。
そのためのオプションもありますが、ここでは内部ステージを作り、スクリプトのあるステージにファイルを移動させることにします。その場合、ローカルファイルシステム内のファイルを移動させ、ステージ内でも移動させる必要があります。これはすべてSnowparkでできます。プロセスを分解してみます。
ローカルファイルにアクセスする:内部ステージを作成する(まだステージが存在しない場合) -> ファイルをステージに読み込む -> ファイルをデータフレームに読み込む
その結果をSQL Server にロードする:変換されたデータをSnowflakeのテーブルにロードする
次のファイルのためにファイルを移動する:ローカルファイルを移動する -> ステージ内にファイルを移動する
これらのことができるコードを見てみます。
ローカルでアクセス可能なファイルにアクセスする¶
このSparkのソースコードは次のようになります。
# Spark read from a local csv file.
df = spark.read.csv('customer_update.csv', header=True, inferSchema=True)
そして、変形したSnowparkのコード(SMA)は次のようになります。
# Snowpark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv('customer_update.csv')
これを、上記の手順を実行するコードに置き換えることができます。
内部ステージを作成します(まだ存在しない場合)。'LOCAL_LOAD_STAGE' というステージを作成し、いくつかのステップを経て、ステージを確認します。
# Create a stage if one does not already exist.
# name the stage we're going to use.
target_stage_name = "LOCAL_LOAD_STAGE"
# Check to see if this stage already exists.
stages = session.sql("SHOW STAGES").collect()
target_stages = [stage for stage in stages if stage['name'] == target_stage_name]
# Create the stage if it does not already exist.
if(len(target_stages) < 1):
from snowflake.core import Root
from snowflake.core.stage import Stage, StageEncryption, StageResource
root = Root(session)
my_stage = Stage(name="LOCAL_LOAD_STAGE",encryption=StageEncryption(type="SNOWFLAKE_SSE"))
root.databases["ADVENTUREWORKS"].schemas["DBO"].stages.create(my_stage)
print('%s created.'%(target_stage_name))
else:
print('%s already exists.'%(target_stage_name))
ステージにファイルをロードします。
# Move the file.
put_results = session.file.put(local_file_name="customer_update.csv",
stage_location="ADVENTUREWORKS.DBO.LOCAL_LOAD_STAGE",
overwrite=False,
auto_compress=False)
# Read the results.
for r in put_results:
str_output = ("File {src}: {stat}").format(src=r.source,stat=r.status)
print(str_output)
ファイルをデータフレームに読み込みます。これは、SMAが実際に変換した部分です。ファイルの場所が内部ステージになったことを指定する必要があります。
# Location of the file in the stage.
csv_file_path = "@LOCAL_LOAD_STAGE/customer_update.csv"
# Spark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv(csv_file_path)
その結果はこうなります。

次のステップに進みます。
結果をSnowflakeにロードする¶
オリジナルのスクリプトは、SQL Serverにデータフレームを書き込みました。では、Snowflakeにロードしてみましょう。これはよりシンプルな変換です。データフレームはすでにSnowparkのデータフレームです。これがSnowflakeの長所のひとつです。データにSnowflakeがアクセスできるようになったので、すべてがSnowflakeの内部で行われます。
# Original output from the conversion tool.
# Write the DataFrame to SQL Server.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameWriter.jdbc is not supported
df_transformed.write.jdbc(url=sql_server_url,
table='dbo.DimCustomer',
mode="append",
properties={
"user": sql_server_user,
"password": sql_server_password,
"driver": driver_path
})
# Corrected Snowflake/Snowpark code.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER", mode="append")
テストや検証を行うために一時的なテーブルに書き込みたいかもしれませんが、これは元のスクリプトの動作であることに注意してください。
ファイルを移動して、次のファイルの場所を確保します。¶
これはオリジナルのスクリプトでの動作です。Snowflakeでこれを実現する必要はありませんが、ステージでまったく同じ機能を見せることはできます。これは、オリジナルのファイルシステムでosコマンドを使って行われます。それはSparkに依存せず、そのまま変わりません。しかし、snowparkでこの動作をエミュレートするには、ステージ内のこのファイルを新しいディレクトリに移動する必要があります。
これは以下のPythonコードで簡単にできます。
# New filename.
original_filepath = '@LOCAL_LOAD_STAGE/customer_update.csv'
new_filepath = '@LOCAL_LOAD_STAGE/old_versions/customer_update_%s.csv'%(today_time)
copy_sql = f"COPY FILES INTO {new_filepath} FROM {original_filepath}"
session.sql(copy_sql).collect()
print(f"File copied from {original_filepath} to {new_filepath}")
remove_sql = f"REMOVE {original_filepath}"
session.sql(remove_sql).collect()
print(f"Original file {original_filepath} removed.")
なお、これは既存のコードを置き換えるものではありません。sparkコードをsnowparkに移動させるという既存の動きは維持したいので、osの参照は残すことにします。最終的にはこのようになります。

これで同じモーションが完全に完成しました。では、最後のクリーンアップをして、このスクリプトをテストしてみましょう。
クリーンアップとテスト¶
インポートコールをまったく見ておらず、まったく必要のない構成ファイルもあります。構成ファイルへの参照を残して、スクリプトを実行することもできます。実際、これらの構成ファイルがまだアクセス可能であれば、コードはまだ実行できます。しかし、インポートステートメントをよく見るのであれば、削除した方がいいでしょう。これらのファイルは、インポートステートメントとセッションコールの間のすべてのコードで表されます。

他にもいくつかやるべきことがあります。
すべてのインポートがまだ必要であることを確認します。今は置いておきます。エラーがあれば、それに対処できます。
また、チェックするよう警告するために置いておいたEWIが1つあります。そのため、その出力を確実に検査するようにしたいです。
ファイルシステムの動作が、POSシステムで期待されるファイルシステムの動作と同じであることを確認する必要があります。そのためには、customer_update.csvファイルを、VS Codeを最初に起動したときに選択したルートフォルダーに移動する必要があります。
同じディレクトリに「old_versions」というディレクトリを作成します。これで、OSのオペレーションが実行できるようになるはずです。
最後に、実稼働のテーブルに直接コードを実行することに抵抗がある場合は、このテスト用にテーブルのコピーを作成し、ロードをそのコピーに向けることができます。ロードステートメントを以下のものに置き換えます。これはラボなので、「実稼働」テーブルに自由に書き込んでください。
# In case we want to test.
create_sql = """
CREATE OR REPLACE TABLE ADVENTUREWORKS.DBO.DIMCUSTOMER_1
AS select * from ADVENTUREWORKS.DBO.DIMCUSTOMER;
"""
session.sql(create_sql).collect()
# Write the DataFrame to SQL Server.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER_1", mode="append")
これでようやくテストする準備が整いました。Pythonでこのスクリプトをテスト用のテーブルに実行し、失敗するかどうかを確認することができます。では、実行してください。
残念です。スクリプトは以下のエラーで失敗しました。

識別子を参照する方法が、Snowparkが必要とする方法ではないようです。失敗したコードは、残りのEWIがある場所と同じところにあります。

エラーによって提供されたリンクでドキュメントを参照することもできますが、時間の都合上、Snowparkはこの変数を明示的にリテラルにする必要があります。次のような入れ替えが必要です。
# Old
split_col = split(df_uppercase['NAME'], '.first:')
# New
split_col = split(df_uppercase['NAME'], lit('.first:'))
これでこのエラーは解消されるはずです。ソースプラットフォームとターゲットプラットフォームの間には、常に機能的な違いがあることに注意してください。SMAのような変換ツールは、こうした違いをできるだけ明白にしようとします。しかし、100%自動化された変換は存在しません。
もう一度実行します。今回は成功です。

これを検証するためにPythonでクエリを書くこともできますが、Snowflakeで行いましょう(これからやろうとしていることなので)。
これらのスクリプトを実行するために、使用しているSnowflakeアカウントに移動します。このスクリプトは、SQL Serverからデータベースをロードする際に使用したものと同じでなければなりません(ロードしていない場合は、データがまだ移行されていないため、上記のスクリプトは動作しません)。
これは、ステージがファイルで作成されているかどうかを見れば、すぐに確認できます。

ディレクトリテーブルビューを有効にして、old_versionsフォルダーがあるか確認します。

そのようです。

これがスクリプトの最後の要素だったので、問題なさそうです。
また、単純にアップロードしたデータをテーブルに問い合わせることで、データがロードされたことを検証することもできます。新しいワークシートを開き、このクエリを書くだけでいいです。
select * from ADVENTUREWORKS.DBO.DIMCUSTOMER
where FIRSTNAME like '%Brandon%'
AND LASTNAME like '%Carver%'
これは、今読み込まれた名前のひとつです。そして、パイプラインは機能したようです。

Snowsightでパイプラインスクリプトを実行する¶
変換しようとしているフローがSparkで行っていたことを簡単に振り返ってみます。
ローカルファイルにアクセスする
その結果をSQL Serverにロードする
ファイルを移動して、次のファイルの場所を確保する
このフローは、Snowsightから完全に実行することはできません。Snowsightはローカルファイルシステムにアクセスできません。ここでお勧めしたいのは、POSからデータレイクにエクスポートすることです。あるいは、Snowsightからアクセスできる他のオプションもあります。
しかし、SnowflakeでPythonスクリプトを実行することで、Snowparkがどのように変換ロジックを処理するかを詳しく見ることができます。上記の推奨される変更をすでに行っている場合、SnowflakeのPythonワークシートでスクリプト本体を実行することができます。
これを行うには、まずSnowflakeアカウントにログインし、ワークシートセクションに移動します。このワークシートで、新しいPythonワークシートを作成します。

使用するデータベース、スキーマ、ロール、ウェアハウスを指定します。

これでセッションコールに対処する必要はなくなりました。ワークシートウィンドウにテンプレートが生成されるのが見えます。

まずはインポートコールを持ってくることから始めます。先ほどのスクリプトを使えるようにしたら、次のようなインポートができるはずです。
# General Imports
import pandas as pd
import os
import shutil
import datetime
# Snowpark Imports
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import upper
from snowflake.snowpark.functions import lower
from snowflake.snowpark.functions import split
from snowflake.snowpark.functions import trim
from snowflake.snowpark.functions import when
from snowflake.snowpark.functions import lit
from snowflake.snowpark.functions import expr
from snowflake.snowpark.functions import regexp_replace
必要なのはSnowparkのインポートだけです。ファイルシステム上でファイルを移動することはありません。ステージ内でファイルを移動させたい場合は、日時の参照を保持することができます。(行いましょう)。
Snowparkのインポート(とdatetime)をPythonワークシートのすでに存在するその他のインポートの下に貼り付けます。「col」はすでにインポートされているので、どちらかを削除すればよいことに注意してください。

「def main」コールの下に、変換コードをすべて貼り付けます。これには、csvの場所の割り当てから、データフレームのテーブルへの書き込みまでが含まれます。
ここから:

ここに:

また、ステージ内でファイルを移動させるコードを追加することもできます。この部分です。

ただし、コードを実行する前に、手動でステージを作成し、ファイルをステージに移動する必要があります。スクリプトにcreate stageステートメントを追加することはできますが、ファイルを手動でステージにロードする必要があります。
そこで、別のワークシート(今回はSQLワークシート)を開き、ステージを作成する基本的なSQLステートメントを実行できます。
CREATE STAGE my_int_stage
ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
正しいデータベース、スキーマ、ロール、ウェアハウスを選択してください。

また、 [Snowsight UIで直接内部ステージを作成 (https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage#create-a-named-stage-using-snowsight) することもできます。ステージが存在するので、目的のファイルを手動でステージにロードすることができます。Snowsight UI のDatabasesセクションに移動し、該当するdatabase.schemaで先ほど作成したステージを見つけます。

ウィンドウの右上にある+Filesオプションを選択してcsvファイルを追加します。アップロードメニューが表示されます。

プロジェクトディレクトリにドラッグアンドドロップするかブラウズして、customer_update.csvファイルをステージにロードします。

画面右下のUploadを選択します。ステージ画面に戻ります。ファイルを表示するには、Enable Directory Tableを選択する必要があります。

そして、ファイルがステージに表示されました。

もちろん、これはもうパイプラインとは言えません。しかし、少なくともSnowflakeでログインを実行することはできます。ワークシートに移動した残りのコードを実行します。このユーザーは1回目は成功しましたが、2回目の成功を保証するものではありません。

Snowflakeでこの関数を定義したら、他の方法で呼び出すことができることに注意してください。AdventureWorksがPOSを100%置き換えるのであれば、変換ロジックをSnowflakeに持たせることは理にかなっているかもしれません。特にオーケストレーションとファイル移動が全く別の場所で処理される場合です。これにより、Snowparkは得意とする変換ロジックに集中することができます。
結論¶
スクリプトファイルについては以上です。パイプラインの最良の例とは言えませんが、SMAの出力への対応については詳細です。
すべての問題の解決
セッションコールの解決
入出力の解決
クリーンアップとテスト
レポートノートブックに移りましょう。