Snowpark Migration Accelerator: Pipeline-Konvertierung

Der SMA hat unsere Skripte „konvertiert“, aber ist das wirklich der Fall? Tatsächlich wurden alle Referenzen aus der Spark API zur Snowpark API konvertiert, aber was nicht getan wurde, ist, die Verbindungen zu ersetzen, die möglicherweise in Ihren Pipelines vorhanden sind.

Die Stärke des SMA liegt in dem erstellten Bewertungsbericht, da die Konvertierung an die Konvertierung von Referenzen aus der Spark API zur Snowpark API gebunden ist. Beachten Sie, dass die Konvertierung dieser Referenzen nicht ausreicht, um eine Datenpipeline zu betreiben. Sie müssen sicherstellen, dass die Verbindungen der Pipeline manuell aufgelöst werden. Der SMA kann nicht davon ausgehen, dass Verbindungsparameter oder andere Elemente bekannt sind, die wahrscheinlich nicht für die Ausführung verfügbar sind.

Wie bei jeder Konvertierung kann auch der konvertierte Code auf verschiedene Weisen verarbeitet werden. Die folgenden Schritte sind die Empfehlung, die Sie für die Ausgabe des Konvertierungstools verwenden sollten. Genau wie SnowConvert erfordert der SMA, dass der Ausgabe Aufmerksamkeit geschenkt wird. Keine Konvertierung wird jemals zu 100 % automatisiert sein. Dies gilt insbesondere für den SMA. Da der SMA Referenzen aus der Spark API zur Snowpark APIkonvertiert, müssen Sie immer überprüfen, wie diese Referenzen ausgeführt werden. Er versucht nicht, die erfolgreiche Ausführung eines Skripts oder Notebooks zu orchestrieren, das darüber ausgeführt wird.

Wir werden also die folgenden Schritte ausführen, um die Ausgabe des SMA durchzugehen, was sich etwas von SnowConvert unterscheiden wird:

  • Alle Probleme lösen: „Probleme“ bezeichnet hier die Probleme, die durch den SMA erzeugt werden. Werfen Sie einen Blick auf den Ausgabecode. Beheben Sie Parsing-Fehler und Konvertierungsfehler, und untersuchen Sie die Warnungen.

  • Sitzungsaufrufe auflösen: Wie der Sitzungsaufruf im Ausgabecode geschrieben wird, hängt davon ab, wo die Datei ausgeführt werden soll. Wir werden das Problem lösen, um die Codedateien an dem gleichen Speicherort auszuführen, an dem sie ursprünglich ausgeführt werden sollten, und um sie anschließend in Snowflake auszuführen.

  • Ein-/Ausgaben auflösen: Verbindungen zu verschiedenen Quellen können nicht vollständig durch den SMA aufgelöst werden. Es gibt Unterschiede zwischen den Plattformen, und der SMA wird dies in der Regel ignorieren. Dies wird auch davon beeinflusst, wo die Datei ausgeführt werden soll.

  • ** Bereinigen und Testen **! Lassen Sie uns den Code ausführen. Überprüfen Sie, ob es funktioniert. Wir werden in diesen praktischen Übungen einfache Tests durchführen, aber es gibt Tools, mit denen Sie umfangreichere Tests und Datenvalidierungen durchführen können, einschließlich Snowpark Python Checkpoints.

Schauen wir uns also an, wie das aussieht. Wir werden dies mit zwei Ansätzen tun: Der erste Ansatz besteht darin, dies in Python auf dem lokalen Rechner auszuführen (wenn das Quellskript ausgeführt wird). Der zweite Ansatz wäre, alles in Snowflake zu tun … in Snowsight, aber für eine Datenpipeline, die aus einer lokalen Quelle liest, wird dies in Snowsight nicht zu 100 % möglich sein. Das ist aber in Ordnung. Wir konvertieren die Orchestrierung dieses Skripts nicht in dieses POC.

Beginnen wir mit der Pipeline-Skriptdatei und kommen im nächsten Abschnitt zum Notebook.

Probleme beheben

Lassen Sie uns unseren Quellcode und unseren Ausgabecode in einem Code-Editor öffnen. Sie können einen Code-Editor Ihrer Wahl verwenden, aber wie bereits erwähnt, würde Snowflake die Verwendung von VS Code mit der Snowflake-Erweiterung empfehlen. Die Snowflake-Erweiterung hilft nicht nur bei der Navigation durch die Probleme von SnowConvert, sondern kann auch Snowpark-Checkpoints für Python ausführen, was bei Tests und Ursachenanalysen hilfreich wäre (wenn auch der Bereich dieser praktischen Übungen nicht spezifiziert ist).

Lassen Sie uns das Verzeichnis öffnen, das wir ursprünglich im Bildschirm für die Projekterstellung (Spark ADW Lab) in VS Code erstellt haben:

Verzeichnis für die praktischen Übungen

Beachten Sie, dass die Verzeichnisstruktur Ausgabe dieselbe ist wie die des Eingabeverzeichnisses. Sogar die Datendatei wird mit kopiert, obwohl keine Konvertierung stattfindet. Es wird auch ein paar checkpoints.json-Dateien geben, die vom SMA erstellt werden. Dies sind JSON-Dateien, die Anweisungen für die Snowpark Checkpoints-Erweiterung enthalten. Die Snowflake-Erweiterung kann auf der Grundlage der Daten in diesen Dateien Checkpoints sowohl in den Quell- als auch in den Ausgabecode laden. Wir werden sie vorerst ignorieren.

Vergleichen wir zum Schluss das Eingabe-Python-Skript mit dem konvertierten Skript im Ausgabeskript.

Skriptvergleich

Dies ist ein sehr einfacher Vergleich mit dem ursprünglichen Spark-Code auf der linken Seite und dem ausgegebenen Snowpark-kompatiblen Code auf der rechten Seite. Es sieht so aus, als ob einige Importe sowie die Sitzungsaufrufe konvertiert wurden. Wir können eine EWI am unteren Rand des obigen Bildes sehen, aber wir beginnen dort nicht. Wir müssen den Parsing-Fehler finden, bevor wir etwas anderes tun können.

Wir können das Dokument nach dem Fehlercode für diesen Parsing-Fehler durchsuchen, der sowohl in der UI und in der Datei „issues.csv“: SPRKPY1101 angezeigt wurde.

Fehlercode

Da ich die Ergebnisse nicht gefiltert habe, wird das Freigabeangebot dieses Fehlercodes in der Datei issues.csv auch in der Suche und in der Datei AssessmentReport.json angezeigt, die zum Erstellen des zusammenfassenden Bewertungsberichts AssessmentReportdocx verwendet wird. Dies ist der Hauptbericht, durch den die Benutzenden navigieren werden, um einen großen Workload zu verstehen, aber wir haben ihn in diesen praktischen Übungen nicht näher betrachtet. (Weitere Informationen zu diesem Bericht finden Sie in der SMA-Dokumentation. Wählen wir aus, wo diese EWI wie oben gezeigt in der Datei pipeline_dimcustomer.py angezeigt werden soll.

Sie können sehen, dass diese Codezeile am Ende des Quellcodes vorhanden war.

# Conversion Input.
some rogue code that doesn't make any sense!

# Conversion Output.
some
# EWI: SPRKPY1101 => Unrecognized or invalid CODE STATEMENT @(131, 6). Last valid token was 'some' @(131, 1), failed token 'rogue' @(131, 6)
#     rogue code that doesn't make any sense!
Copy

Es sieht so aus, als ob dieser Parsing-Fehler auf … „einen Fehlercode, der keinen Sinn ergibt!“ zurückzuführen war. Diese Codezeile befindet sich am Ende der Pipeline-Datei. Es ist nicht ungewöhnlich, dass eine Codedatei zusätzliche Zeichen oder andere Elemente im Rahmen einer Extraktion aus einer Quelle enthält. Beachten Sie, dass der SMA erkannte, dass dies kein gültiger Python-Code war und dieser den Parsing-Fehler erzeugte.

Sie können auch sehen, wie der SMA sowohl den Fehlercode als auch die Beschreibung als Kommentar in den Ausgabecode einfügt, wo der Fehler aufgetreten ist. So werden alle Fehlermeldungen in der Ausgabe angezeigt.

Da dies kein gültiger Code ist, befindet er sich am Ende der Datei und nichts weiter wurde als Ergebnis dieses Fehlers entfernt. Der ursprüngliche Code und der Kommentar können sicher aus der Ausgabecodedatei entfernt werden.

Jetzt haben wir unser erstes und schwerwiegendstes Problem gelöst. Seien Sie gespannt.

Lassen Sie uns den Rest der EWIs in dieser Datei durcharbeiten. Wir können nach „EWI“ suchen, weil wir jetzt wissen, dass bei jedem Fehlercode ein Text im Kommentar erscheint. (Alternativ dazu könnten wir die Datei „issues.csv“ sortieren und die Probleme nach Schweregrad sortieren, aber das ist hier nicht wirklich notwendig.)

Der nächste ist eigentlich nur eine Warnung, kein Fehler. Das zeigt uns, dass eine Funktion verwendet wurde, die in Spark und Snowpark nicht immer gleichwertig ist:

#EWI: SPRKPY1067 => Snowpark does not support split functions with more than two parameters or containing regex pattern. See documentation for more info.
split_col = split(df_uppercase['NAME'], '.first:')
Copy

Die Beschreibung hier verdeutlicht jedoch, dass wir uns darüber wahrscheinlich keine Gedanken machen müssen. Es werden nur zwei Parameter übergeben. Belassen wir diese EWI als Kommentar in der Datei, damit wir wissen, dass wir darauf achten müssen, wenn wir die Datei später ausführen.

Der letzte Fehler für diese Datei ist ein Konvertierungsfehler, der besagt, dass etwas nicht unterstützt wird:

Fehler „Nicht unterstützt“

Dies ist der Schreibaufruf für den Spark-Jdbc-Treiber, um den Ausgabe-Datenframe in SQL Server zu schreiben. Da dies Bestandteil des Schritts „Alle Ein-/Ausgaben auflösen“ ist, mit dem wir uns befassen werden, nachdem wir unsere Probleme behoben haben, behalten wir dies für später bei. Beachten Sie jedoch, dass dieser Fehler behoben werden muss. Der vorherige Fehler war nur eine Warnung und funktioniert möglicherweise auch ohne Änderungen.

Auflösen der Sitzungsaufrufe

Die Sitzungsaufrufe werden durch den SMA konvertiert, aber Sie sollten ihnen besondere Aufmerksamkeit schenken, um sicherzustellen, dass sie funktionsfähig sind. In unserem Pipeline-Skript ist dies der Vorher-Nachher-Code:

Skript vorher und nachher

Die SparkSession-Referenz wurde in „Sitzung“ geändert. Sie können diese Referenzänderung am Anfang dieser Datei auch in der Importanweisung sehen:

Referenzänderung

Beachten Sie, dass in der obigen Abbildung die Variablenzuweisung des Sitzungsaufrufs für „spark“ nicht geändert wird. Das liegt daran, dass es sich um eine Variablenzuweisung handelt. Es ist nicht notwendig, dies zu ändern, aber wenn Sie den „spark“-Decorator in „Sitzung“ ändern möchten, würde dies besser den Empfehlungen von Snowpark entsprechen. (Beachten Sie, dass die VS Code-Erweiterung „SMA Assistant“ ebenfalls diese Änderungen vorschlagen wird.)

Dies ist eine einfache Übung, die sich aber lohnt. Sie können den Vorgang „Suchen und Ersetzen“ unter Verwendung der eigenen Suchfunktion von VS Code durchführen, um die Verweise auf „spark“ in dieser Datei zu finden und sie durch „session“ zu ersetzen. Sie können das Ergebnis in der Abbildung unten sehen. Die Verweise auf die Variable „spark“ im konvertierten Code wurden durch „session“ ersetzt:

Spark-Variablen in „session“ konvertiert

Wir können auch etwas anderes aus diesem Sitzungsaufruf entfernen. Da wir „spark“ nicht mehr ausführen werden, müssen wir den Treiberpfad für den spark-Treiber nicht mehr angeben. So können wir die Konfigurationsfunktion wie folgt vollständig aus dem Sitzungsaufruf entfernen:

# Old Converted output.
# Spark Session
session = Session.builder.config('spark.driver.extraClassPath', driver_path) \
                    .app_name('SparkSQLServerExample', True) \
                    .getOrCreate()

# New Converted Output
# Snowpark Session
session = Session.builder.app_name('SparkSQLServerExample', True).getOrCreate()
Copy

Sie können diese auch in eine einzelne Zeile umwandeln. Der SMA war sich nicht sicher, dass wir diesen Treiber nicht benötigten (obwohl das logisch erscheint). Folglich wurde er nicht entfernt. Jetzt ist unser Sitzungsaufruf allerdings abgeschlossen.

(Beachten Sie, dass der SMA der Sitzung außerdem ein „Abfrage-Tag“ hinzufügt. Dies dient dazu, später Probleme mit dieser Sitzung oder Abfrage zu beheben. Dies ist jedoch völlig optional, diese Möglichkeit zu nutzen oder zu entfernen.)

Hinweise zu den Sitzungsaufrufen

Ob Sie es glauben oder nicht: Das ist alles, was wir im Code für den Sitzungsaufruf ändern müssen, aber das ist nicht alles, was wir tun müssen, um die Sitzung zu erstellen. Dies bezieht sich auf die ursprüngliche Frage, dass ein Großteil davon abhängig ist, wo Sie diese Dateien ausführen möchten. Diese ursprünglichen spark-Sitzungsaufrufe verwendeten eine Konfiguration, die an anderer Stelle eingerichtet wurde. Wenn Sie sich den ursprünglichen Spark-Sitzungsaufruf ansehen, sucht dieser nach einer Konfigurationsdatei, die zu Beginn dieser Skriptdatei in einen pandas-Datenframe-Speicherort eingelesen wird (dies gilt tatsächlich auch für unsere Notebook-Datei).

Verweis auf Konfigurationsdatei

Snowpark kann auf die gleiche Weise funktionieren, und diese Konvertierung geht davon aus, dass dieser Benutzende diesen Code auf diese Weise ausführen wird. Damit der bestehende Sitzungsaufruf funktioniert, müsste der Benutzende jedoch alle Informationen für sein Snowflake-Konto in die lokale (oder zumindest zugängliche) Datei „connections.toml“ auf diesem Computer laden. Das Konto, mit dem versucht wird, eine Verbindung herzustellen, sollte als Standard eingestellt sein. Weitere Informationen zum Aktualisieren der Datei „connections.toml“ finden Sie in der Snowflake/Snowpark-Dokumentation, aber die Idee dahinter ist, dass es einen zugänglichen Speicherort gibt, der über die Anmeldeinformationen verfügt. Wenn eine Snowpark-Sitzung erstellt wird, wird dies überprüft – es sei denn, die Verbindungsparameter werden explizit an den Sitzungsaufruf übergeben.

Standardmäßig werden die Verbindungsparameter direkt als Zeichenfolgen eingegeben und mit der Sitzung aufgerufen:

# Parameters in a dictionary.
connection_parameters = {
  "account": "<your snowflake account>",
  "user": "<your snowflake user>",
  "password": "<your snowflake password>",
  "role": "<your snowflake role>",  # optional
  "warehouse": "<your snowflake warehouse>",  # optional
  "database": "<your snowflake database>",  # optional
  "schema": "<your snowflake schema>",  # optional
}

# The session call
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()
Copy

AdventureWorks scheint auf eine Datei mit diesen Anmeldeinformationen verwiesen und diese aufgerufen zu haben. Angenommen, es gibt eine ähnliche Datei namens „snowflake_credentials.txt“, auf die zugegriffen werden kann, dann könnte die entsprechende Syntax in etwa wie folgt aussehen:

# Load into a dataframe.
snow_creds = pd.read_csv('snowflake_credentials.txt', index_col=None, header=0)

# Build the parameters.
connection_parameters = {
  "account": snow_creds.loc[snow_creds['Specific_Element'] == 'Account', 'Value'].item(),
  "user": snow_creds.loc[snow_creds['Specific_Element'] == 'Username', 'Value'].item(),
  "password": snow_creds.loc[snow_creds['Specific_Element'] == 'Password', 'Value'].item(),
  "role": "<your snowflake role>",  # optional
  "warehouse": snow_creds.loc[snow_creds['Specific_Element'] == 'Warehouse', 'Value'].item(),  # optional
  "database": snow_creds.loc[snow_creds['Specific_Element'] == 'Database', 'Value'].item(),  # optional
  "schema": snow_creds.loc[snow_creds['Specific_Element'] == 'Schema', 'Value'].item(),  # optional
}

# Then pass the parameters to the configs function of the session builder.
session = Session.builder.configs(connection_parameters).app_name("AdventureWorksSummary", True).getOrCreate()
Copy

Aufgrund des Zeitlimits für die praktischen Übungen ist die erste Option möglicherweise sinnvoller. Mehr dazu finden Sie in der Snowpark-Dokumentation.

Beachten Sie, dass Sie nichts davon tun müssen, damit unsere Notebook-Datei innerhalb von Snowflake mit Snowsight ausgeführt wird. Sie würden einfach die aktive Sitzung aufrufen und sie ausführen.

Jetzt ist es Zeit für die kritischste Komponente dieser Migration – nämlich alle Eingabe-/Ausgabereferenzen aufzulösen.

Auflösen der Eingaben und Ausgaben

Lassen Sie uns nun unsere Ein- und Ausgaben auflösen. Beachten Sie, dass dies davon abweicht, ob Sie die Dateien lokal oder in Snowflake ausführen. Für das Python-Skript stellen wir sicher, was wir gewinnen/verlieren, wenn wir direkt innerhalb von Snowsight ausführen: Sie können nicht den gesamten Vorgang in Snowsight ausführen (zumindest nicht derzeit). Die lokale CSV-Datei ist nicht über Snowsight zugänglich. Sie müssen die CSV-Datei manuell in einen Stagingbereich laden. Dies wird wahrscheinlich keine ideale Lösung sein, aber wir können die Konvertierung damit testen.

Wir bereiten diese Datei also zunächst so vor, dass sie lokal ausgeführt/orchestriert und dann in Snowflake ausgeführt werden kann.

Um die Eingaben und Ausgaben des Pipeline-Skripts auflösen zu können, müssen wir sie zunächst identifizieren. Sie sind recht einfach. Dieses Skript scheint Folgendes tun zu können:

  • Zugreifen auf eine lokale Datei

  • Laden des Ergebnisses in SQL Server (aber jetzt Snowflake)

  • Verschieben der Datei, um Platz für die nächste zu machen

Ganz einfach. Wir müssen also jede Komponente des Codes ersetzen, die diese Aufgaben erledigt. Beginnen wir mit dem Zugriff auf die lokale Datei.

Wie bereits zu Beginn erwähnt, wäre es dringend empfohlen, das Point-of-Sale-System und die Orchestrierungstools, die zur Ausführung dieses Python-Skripts verwendet werden, neu zu gestalten, um die Ausgabedatei in einen Cloudspeicherort zu speichern. Dann könnten Sie diesen Speicherort in eine externe Tabelle umwandeln, und siehe da, Sie befinden sich in Snowflake. Die aktuelle Architektur besagt jedoch, dass sich diese Datei nicht in einem Cloudspeicherort befindet und dort bleibt, wo sie ist. Daher müssen wir eine Möglichkeit finden, wie Snowflake auf diese Datei zugreifen kann, und dabei die bestehende Logik beibehalten.

Wir haben Optionen dafür, aber wir werden einen internen Stagingbereich erstellen und die Datei mit dem Skript in den Stagingbereich verschieben. Wir müssten dann die Datei im lokalen Dateisystem und auch im Stagingbereich verschieben. Dies ist alles mit Snowpark möglich. Schauen wir uns das genauer an:

  • Zugreifen auf eine lokale Datei: Erstellen Sie einen internen Stagingbereich (falls noch nicht vorhanden) -> Laden Sie die Datei in den Stagingbereich -> Lesen Sie die Datei in einen Datenframe

  • Laden des Ergebnisses in SQL Server: Laden Sie die transformierten Daten in eine Tabelle in Snowflake

  • Verschieben der Datei, um Platz für die nächste zu machen: Verschieben Sie die lokale Datei -> Verschieben Sie die Datei in den Stagingbereich.

Schauen wir uns den Code an, der jede dieser Aufgaben erledigen kann.

Zugreifen auf eine lokal zugängliche Datei

Dieser Quellcode in Spark sieht wie folgt aus:

# Spark read from a local csv file.
df = spark.read.csv('customer_update.csv', header=True, inferSchema=True)
Copy

Der transformierte Snowpark-Code (durch SMA) sieht wie folgt aus:

# Snowpark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv('customer_update.csv')
Copy

Wir können das durch diesen Code ersetzen, der die oben genannten Schritte ausführt:

  1. Erstellen Sie einen internen Stagingbereich (falls noch nicht vorhanden). Wir werden einen Stagingbereich namens „LOCAL_LOAD_STAGE“ erstellen und führen einige Schritte durch, um sicherzustellen, dass der Stagingbereich „r“ ist.

# Create a stage if one does not already exist.
# name the stage we're going to use.
target_stage_name = "LOCAL_LOAD_STAGE"

# Check to see if this stage already exists.
stages = session.sql("SHOW STAGES").collect()
target_stages = [stage for stage in stages if stage['name'] == target_stage_name]

# Create the stage if it does not already exist.
if(len(target_stages) < 1):
    from snowflake.core import Root
    from snowflake.core.stage import Stage, StageEncryption, StageResource
    root = Root(session)
    my_stage = Stage(name="LOCAL_LOAD_STAGE",encryption=StageEncryption(type="SNOWFLAKE_SSE"))
    root.databases["ADVENTUREWORKS"].schemas["DBO"].stages.create(my_stage)
    print('%s created.'%(target_stage_name))
else:
    print('%s already exists.'%(target_stage_name))

Copy
  1. Laden Sie die Datei in den Stagingbereich.

# Move the file.
put_results = session.file.put(local_file_name="customer_update.csv",
                    stage_location="ADVENTUREWORKS.DBO.LOCAL_LOAD_STAGE",
                    overwrite=False,
                    auto_compress=False)

# Read the results.
for r in put_results:
    str_output = ("File {src}: {stat}").format(src=r.source,stat=r.status)
    print(str_output)    
Copy
  1. Lesen Sie die Datei in einen Datenframe. Dies ist der Teil, den der SMA tatsächlich konvertiert hat. Wir müssen angeben, dass der Speicherort der Datei jetzt der interne Stagingbereich ist.

# Location of the file in the stage.
csv_file_path = "@LOCAL_LOAD_STAGE/customer_update.csv"

# Spark read from a local csv file.
df = session.read.option("PARSE_HEADER", True).option("INFER_SCHEMA", True).csv(csv_file_path)
Copy

Das Ergebnis würde wie folgt aussehen:

Umgeschriebener Code

Lassen Sie uns zum nächsten Schritt übergehen.

Laden des Ergebnisses in Snowflake

Das ursprüngliche Skript hat den Datenframe in SQL Server geschrieben. Jetzt werden wir diesen in Snowflake laden. Dies ist eine viel einfachere Konvertierung. Der Datenframe ist bereits ein Snowpark-Datenframe. Dies ist einer der Vorteile von Snowflake. Da die Daten nun für Snowflake zugänglich sind, geschieht alles innerhalb von Snowflake.

# Original output from the conversion tool.
# Write the DataFrame to SQL Server.
#EWI: SPRKPY1002 => pyspark.sql.readwriter.DataFrameWriter.jdbc is not supported
df_transformed.write.jdbc(url=sql_server_url,
              table='dbo.DimCustomer',
              mode="append",
              properties={
                  "user": sql_server_user,
                  "password": sql_server_password,
                  "driver": driver_path
              })

# Corrected Snowflake/Snowpark code.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER", mode="append")
Copy

Beachten Sie, dass wir möglicherweise in eine temporäre Tabelle schreiben möchten, um einige Tests/Validierungen durchzuführen, aber dies das Verhalten im ursprünglichen Skript ist.

Verschieben der Datei, um Platz für die nächste zu machen

Dies ist das Verhalten im ursprünglichen Skript. Wir müssen dies nicht wirklich in Snowflake erreichen, aber wir können es tun, um die genau gleiche Funktionalität im Stagingbereich zu demonstrieren. Dies geschieht mit einem BS-Befehl im ursprünglichen Dateisystem. Das hängt nicht von Spark ab und wird gleich bleiben. Um dieses Verhalten in Snowpark zu emulieren, müssten wir diese Datei im Stagingbereich in ein neues Verzeichnis verschieben.

Dies kann mit dem folgenden Python-Code einfach erledigt werden:

# New filename.
original_filepath = '@LOCAL_LOAD_STAGE/customer_update.csv'
new_filepath = '@LOCAL_LOAD_STAGE/old_versions/customer_update_%s.csv'%(today_time)

copy_sql = f"COPY FILES INTO {new_filepath} FROM {original_filepath}"
session.sql(copy_sql).collect()
print(f"File copied from {original_filepath} to {new_filepath}")

remove_sql = f"REMOVE {original_filepath}"
session.sql(remove_sql).collect()
print(f"Original file {original_filepath} removed.")

Copy

Beachten Sie, dass dies keinen der vorhandenen Codes ersetzen würde. Da wir bereits die bestehende Aktion zum Verschieben des Spark-Codes in Snowpark beibehalten möchten, werden wir die BS-Referenz so belassen. Die endgültige Version wird wie folgt aussehen:

Endgültiger Code

Jetzt haben wir die gleiche Aktion vollständig ausgeführt. Lassen Sie uns nun unsere letzte Bereinigung durchführen und dieses Skript testen.

Bereinigen und Testen

Wir haben uns unsere Importaufrufe nie angesehen und wir haben Konfigurationsdateien, die überhaupt nicht notwendig sind. Wir könnten die Verweise auf die Konfigurationsdateien so belassen und das Skript ausführen. Wenn diese Konfigurationsdateien noch zugänglich sind, wird der Code trotzdem ausgeführt. Aber wenn wir uns unsere Importanweisungen genau ansehen, können wir sie auch entfernen. Diese Dateien werden durch den gesamten Code zwischen den Importanweisungen und dem Sitzungsaufruf dargestellt:

Entfernte Anweisungen

Es gibt noch ein paar andere Dinge, die wir tun sollten:

  • Prüfen Sie, ob alle unsere Importe immer noch erforderlich sind. Wir können sie vorerst so belassen. Wenn es einen Fehler gibt, können wir ihn beheben.

  • Wir haben auch eine EWI, die wir dort als Warnung gelassen haben, um sie zu überprüfen. Wir möchten also sicherstellen, dass wir diese Ausgabe prüfen.

  • Wir müssen sicherstellen, dass unser Dateisystem das Verhalten des erwarteten Dateisystems für das POS-System widerspiegelt. Dazu sollten wir die Datei „customer_update.csv“ in den Stammordner verschieben, den Sie beim ersten Start von VS Code gewählt haben.

  • Erstellen Sie in demselben Verzeichnis ein Verzeichnis namens „old_versions“. Dies sollte die Ausführung der BS-Vorgänge ermöglichen.

Wenn Sie nicht damit vertraut sind, den Code direkt in die Produktionstabelle auszuführen, können Sie schließlich eine Kopie dieser Tabelle für diesen Test erstellen und das Laden auf diese Kopie verweisen. Ersetzen Sie die Ladeanweisung durch die unten stehende. Da es sich um praktische Übungen handelt, können Sie einfach in die Tabelle „production“ schreiben:

# In case we want to test.
create_sql = """
                CREATE OR REPLACE TABLE ADVENTUREWORKS.DBO.DIMCUSTOMER_1
                AS select * from ADVENTUREWORKS.DBO.DIMCUSTOMER;
                """
session.sql(create_sql).collect()

# Write the DataFrame to SQL Server.
df_transformed.write.save_as_table("ADVENTUREWORKS.DBO.DIMCUSTOMER_1", mode="append")
Copy

Jetzt sind wir schließlich bereit, dies zu testen. Wir können dieses Skript in Python für eine Testtabelle ausführen und sehen, ob dieses fehlschlägt. Beginnen wir mit der Ausführung!

Tragisch! Das Skript ist mit dem folgenden Fehler fehlgeschlagen:

Skriptfehler

Es sieht so aus, als ob die Art und Weise, wie wir einen Bezeichner referenzieren, nicht so ist, wie Snowpark es gewünscht hat. Der Code, der fehlgeschlagen ist, befindet sich genau an der Stelle, an der die verbleibende EWI ist:

Für Fehler verantwortliche Codezeile

Sie könnten auf die Dokumentation über den Link verweisen, der vom Fehler bereitgestellt wird, aber aus Zeitgründen benötigt Snowpark diese Variable ausdrücklich als Literal. Wir müssen die folgende Ersetzung vornehmen:

# Old
split_col = split(df_uppercase['NAME'], '.first:')

# New
split_col = split(df_uppercase['NAME'], lit('.first:'))
Copy

Dies sollte diesen Fehler beheben. Beachten Sie, dass es immer einige funktionale Unterschiede zwischen einer Quell- und einer Zielplattform geben wird. Konvertierungstools wie SMA möchten diese Unterschiede so offensichtlich wie möglich machen. Beachten Sie jedoch, dass keine Konvertierung zu 100 % automatisiert ist.

Lassen Sie es uns noch einmal ausführen. Dieses Mal mit Erfolg!

Erfolgsmeldung

Wir können einige Abfragen in Python schreiben, um dies zu überprüfen, aber warum tun wir das nicht einfach in Snowflake (weil dies das ist, was wir sowieso tun werden)?

Navigieren Sie zu Ihrem Snowflake-Konto, das Sie für die Ausführung dieser Skripte verwendet haben. Dies sollte dasselbe sein, das Sie zum Laden der Datenbank von SQL Server verwendet haben (und wenn Sie das nicht getan haben, funktionieren die obigen Skripte sowieso nicht, da die Daten noch nicht migriert wurden).

Sie können dies schnell überprüfen, indem Sie sehen, ob der Stagingbereich mit der Datei erstellt wurde:

Lokalisierter erstellter Stagingbereich

Aktivieren Sie die Ansicht der Verzeichnistabelle, um zu sehen, ob sich der Ordner „old_versions“ dort befindet:

Schaltfläche „Verzeichnistabelle aktivieren“

Und es ist:

Lokalisierter old_versions-Ordner

Da dies das letzte Element unseres Skripts war, sieht es so aus, als wären wir gut vorangekommen.

Wir können auch einfach überprüfen, ob die Daten geladen wurden, indem wir einfach die Tabelle nach den von uns hochgeladenen Daten abfragen. Sie können ein neues Arbeitsblatt öffnen und einfach diese Abfrage schreiben:

select * from ADVENTUREWORKS.DBO.DIMCUSTOMER
where FIRSTNAME like '%Brandon%'
AND LASTNAME like '%Carver%'
Copy

Dies ist einer der Namen, die gerade geladen wurden. Und es sieht so aus, als ob unsere Pipeline funktioniert hätte:

Erfolgreiche Abfrage

Ausführen des Pipeline-Skripts in Snowsight

Werfen wir einen kurzen Blick zurück auf den Ablauf, den wir in Spark ausführen möchten:

  • Zugreifen auf eine lokale Datei

  • Laden des Ergebnisses in SQL Server

  • Verschieben der Datei, um Platz für die nächste zu machen

Dieser Workflow kann nicht vollständig aus Snowsight heraus ausgeführt werden. Snowsight hat keinen Zugriff auf ein lokales Dateisystem. Die Empfehlung wäre hier, den Export vom POS zu einem Data Lake zu verschieben oder eine Reihe anderer Optionen, die über Snowsight zugänglich wären.

Wir können uns jedoch genauer ansehen, wie Snowpark die Transformationslogik handhabt, indem wir das Python-Skript in Snowflake ausführen. Wenn Sie die oben empfohlenen Änderungen bereits vorgenommen haben, können Sie den Textteil des Skripts in einem Python-Arbeitsblatt in Snowflake ausführen.

Melden Sie sich dazu zunächst bei Ihrem Snowflake-Konto an, und navigieren Sie zum Abschnitt „Arbeitsblätter“. Erstellen Sie in diesem Arbeitsblatt ein neues Python-Arbeitsblatt:

Arbeitsblätter-Menüpunkte

Geben Sie die Datenbank, das Schema, die Rolle und das Warehouse an, das bzw. die Sie verwenden möchten:

Menü für Datenbank und Schema

Jetzt müssen wir uns nicht mehr mit unserem Sitzungsaufruf befassen. Im Arbeitsblattfenster wird eine erstellte Vorlage angezeigt:

Generierte Python-Vorlage

Beginnen wir damit, unsere Importaufrufe zu übertragen. Nachdem das vorherige Skript einsatzbereit ist, sollten wir den folgenden Satz von Importen haben:

# General Imports
import pandas as pd
import os
import shutil
import datetime

# Snowpark Imports
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import upper
from snowflake.snowpark.functions import lower
from snowflake.snowpark.functions import split
from snowflake.snowpark.functions import trim
from snowflake.snowpark.functions import when
from snowflake.snowpark.functions import lit
from snowflake.snowpark.functions import expr
from snowflake.snowpark.functions import regexp_replace
Copy

Wir benötigen nur die Snowpark-Importe. Wir werden keine Dateien in einem Dateisystem verschieben. Wir könnten die datetime-Referenz beibehalten, wenn wir die Datei im Stagingbereich verschieben möchten. (Lassen Sie uns dies tun.)

Fügen Sie die Snowpark-Importe (+ datetime) in das Python-Arbeitsblatt unter den anderen Importen ein, die bereits vorhanden sind. Beachten Sie, dass „col“ bereits importiert wurde, sodass Sie eines davon entfernen können:

Neuer Code mit eingefügten Importen

Fügen Sie unter dem Aufruf „def main“ den gesamten Transformationscode ein. Dies umfasst alles von der Zuweisung des CSV-Speicherorts bis zum Schreiben des Datenframes in eine Tabelle.

Von hier:

Kopierter Code

Hierher:

Eingefügter Code

Wir können in dem Code, der die Dateien im Stagingbereich verschiebt, auch zurück hinzufügen. Dieser Teil:

Hinzugefügter Code

Bevor Sie den Code jedoch ausführen können, müssen Sie den Stagingbereich manuell erstellen und die Datei in den Stagingbereich verschieben. Wir können die Anweisung „create stage“ in das Skript einfügen, aber wir müssten die Datei weiterhin manuell in den Stagingbereich laden.

Wenn Sie also ein anderes Arbeitsblatt öffnen (dieses Mal ein SQL-Arbeitsblatt), können Sie eine grundlegende SQL-Anweisung ausführen, die den Stagingbereich erstellt:

CREATE STAGE my_int_stage
  ENCRYPTION = (TYPE = 'SNOWFLAKE_SSE');
Copy

Stellen Sie sicher, dass Sie die richtige Datenbank, das richtige Schema, die richtige Rolle und das richtige Warehouse auswählen:

Datenbank und Schema ausgewählt

Sie können auch einen internen Stagingbereich direkt in der Snowsight-UI erstellen. Da der Stagingbereich nun existiert, können wir die gewünschte Datei manuell in den Stagingbereich laden. Navigieren Sie in der Snowsight-UI zum Abschnitt „Datenbanken“, und stellen Sie sicher, dass Sie den soeben erstellten Stagingbereich im entsprechenden „database.schema“ finden:

Lokalisierter Stagingbereich im Schema

Fügen wir unsere CSV-Datei hinzu, indem wir die Option „+Files“ in der oberen rechten Ecke des Fensters auswählen. Dadurch wird das Menü zum Hochladen Ihrer Dateien gestartet:

Menü zum Hochladen Ihrer Dateien

Ziehen Sie die Datei in unser Projektverzeichnis, und laden Sie die Datei „customer_update.csv“ in den Stagingbereich:

Datei „customer_update“ hochgeladen

Wählen Sie in der unteren rechten Ecke des Bildschirms die Option „Upload“ aus. Sie kehren zum Stagingbereichsbildschirm zurück. Um die Dateien anzuzeigen, müssen Sie die Option „Enable Directory Table“ auswählen:

Schaltfläche „Enable Directory Table“

Und jetzt erscheint unsere Datei im Stagingbereich:

Hochgeladene Datei im Stagingbereich

Dies ist natürlich nicht mehr wirklich eine Pipeline. Aber zumindest können wir die Anmeldung in Snowflake ausführen. Führen Sie den Rest des Codes aus, den Sie in das Arbeitsblatt verschoben haben. Dieser Benutzende hatte beim ersten Mal Erfolg, aber das ist keine Garantie für den Erfolg beim zweiten Mal:

Ergebnisse der Abfrageausführung

Beachten Sie, dass Sie diese Funktion, sobald Sie sie in Snowflake definiert haben, auf andere Weise aufrufen können. Wenn AdventureWorks zu 100 % deren POS ersetzt, dann kann es sinnvoll sein, die Transformationslogik in Snowflake zu verwenden, insbesondere wenn die Orchestrierung und Dateiverschiebung komplett an einem anderen Ort durchgeführt werden. So kann sich Snowpark auf die Features der Transformationslogik konzentrieren.

Fazit

Das war’s für die Skriptdatei. Es ist nicht das beste Beispiel für eine Pipeline, aber es ist stark davon abhängig, wie mit der Ausgabe des SMA umgegangen werden soll:

  • Alle Probleme lösen

  • Sitzungsaufrufe auflösen

  • Ein-/Ausgaben auflösen

  • Bereinigen und Testen!

Im Weiteren werden wir mit dem Notebook für die Berichterstellung fortfahren.