Verwenden partitionierter Modelle

Viele Datensets lassen sich leicht in mehrere unabhängige Teilmengen partitionieren.. Ein Datensatz, der die Verkaufsdaten einer Ladenkette enthält, kann z. B. nach Ladennummern partitioniert werden. Für jede Partition kann dann ein eigenes Modell trainiert werden. Trainings- und Inferenzoperationen auf den Partitionen können parallelisiert werden, so dass diese Operationen weniger Zeit in Anspruch nehmen. Da sich die einzelnen Läden wahrscheinlich erheblich darin unterscheiden, wie sich ihre Features auf ihre Umsätze auswirken, kann dieser Ansatz zu genaueren Schlussfolgerungen auf der Ladenebene führen.

Die Snowflake Model Registry unterstützt die verteilte Verarbeitung von Training und Inferenz von partitionierten Daten:

  • Das Dataset enthält eine Spalte, die Partitionen in den Daten zuverlässig identifiziert.

  • Die Daten in jeder einzelnen Partition sind unkorreliert mit den Daten in den anderen Partitionen und enthalten genügend Zeilen, um das Modell zu trainieren.

Modelle können zustandslos (das Training wird bei jedem Aufruf der Inferenz durchgeführt) oder zustandsabhängig (das Training wird einmal vor der Inferenz durchgeführt und für die Verwendung bei mehreren Inferenzoperationen beibehalten) sein.

Mit der Snowflake Model Registry implementieren Sie partitioniertes Training und Inferenz mit benutzerdefinierten Modellen. Während der Inferenz partitioniert die Modellinferenzmethode den Datensatz, generiert für jede Partition parallel Vorhersagen unter Verwendung aller Knoten und Kerne in Ihrem Warehouse und kombiniert die Ergebnisse anschließend in einem einzigen Datensatz.

Bemerkung

Bei partitionierten Modellen ist es wichtig, das registrierte Modell von den einzelnen Modellen zu unterscheiden, die durch das registrierte Modell erstellt wurden oder es zusammensetzen. Wo es möglich ist, werden wir die einzelnen zugrunde liegenden Modelle als Teilmodelle bezeichnen.

Bemerkung

Partitioniertes Training und Inferenz erfordert Snowpark ML (snowflake-ml-python-Paket) Version 1.5.0 oder höher.

Definieren und Protokollieren des Modells

Die partitionierte Modellklasse erbt von snowflake.ml.model.custom_model.CustomModel, und Inferenzmethoden werden mit dem Decorator @custom_model.partitioned_inference_api (Snowpark ML-Version 1.5.4 oder höher) oder @custom_model.inference_api (Snowpark ML-Version 1.5.0 bis 1.5.3) deklariert. Unter Integrieren eigener Modelltypen mittels serialisierter Dateien finden Sie Informationen zur Definition von benutzerdefinierten Standardmodellen.

import pandas as pd

from snowflake.ml.model import custom_model

class ExamplePartitionedModel(custom_model.CustomModel):

  @custom_model.partitioned_inference_api
  def predict(self, input: pd.DataFrame) -> pd.DataFrame:
      # All data in the partition will be loaded in the input dataframe.
      #… implement model logic here …
      return output_df

my_model = ExamplePartitionedModel()
Copy

Geben Sie bei der Protokollierung des Modells eine function_type von TABLE_FUNCTION im options-Dictionary an, zusammen mit allen anderen Optionen, die Ihr Modell benötigt.

from snowflake.ml.registry import Registry

reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
model_version = reg.log_model(my_model,
  model_name="my_model",
  version_name="v1",
  options={"function_type": "TABLE_FUNCTION"},    ###
  conda_dependencies=["scikit-learn"],
  sample_input_data=train_features
)
Copy

Wenn Ihr partitioniertes Modell auch reguläre (Nicht-Tabellen)-Funktionen als Methoden hat, können Sie stattdessen das method_options-Wörterbuch verwenden, um den Typ jeder Methode anzugeben.

model_version = reg.log_model(my_model,
    model_name="my_model",
    version_name="v1",
    options={
      "method_options": {                                 ###
        "METHOD1": {"function_type": "TABLE_FUNCTION"},   ###
        "METHOD2": {"function_type": "FUNCTION"}          ###
      }
    }
    conda_dependencies=["scikit-learn"],
    sample_input_data=train_features
)
Copy

Partitionierte Modellinferenz

Verwenden Sie die run-Methode eines Python ModelVersion-Objekts, um die Methoden der Tabellenfunktionen partitioniert aufzurufen. Übergeben Sie partition_column, um den Namen der Spalte anzugeben, die einen numerischen oder Zeichenfolge-Wert enthält, der die Partition jedes Datensatzes identifiziert. Wie üblich können Sie einen Snowpark oder pandas-DataFrame übergeben (letzteres ist für lokale Tests nützlich). Sie erhalten die gleiche Art von DataFrame als Ergebnis. In diesen Beispielen erfolgt die Partitionierung der Inferenz anhand einer Ladennummer.

model_version.run(
  input_df,
  function_name="PREDICT",
  partition_column="STORE_NUMBER"
)
Copy

Sie können die Funktionen der Modelltabelle auch direkt mit SQL aufrufen, wie hier gezeigt.

SELECT output1, output2, partition_column
  FROM input_table,
      TABLE(
          my_model!predict(input_table.input1, input_table.input2)
          OVER (PARTITION BY input_table.store_number)
      )
  ORDER BY input_table.store_number;
Copy

Die Eingabedaten werden automatisch auf die Knoten und Kerne in Ihrem Warehouse aufgeteilt, und die Partitionen werden parallel verarbeitet.

Weitere Informationen zur Syntax von Tabellenfunktionen finden Sie unter Aufrufen einer UDF mit SQL.

Zustandslose partitionierte Modelle

In der einfachsten Anwendung von partitionierten Modellen werden sowohl das Training als auch die Inferenz durchgeführt, wenn predict aufgerufen wird. Das Modell wird angepasst, die Inferenz wird durchgeführt und das angepasste Modell wird unmittelbar danach verworfen. Diese Art von Modell wird „zustandslos“ genannt, weil kein Anpassungszustand gespeichert wird. Im folgenden Beispiel trainiert jede Partition ein XGBoost-Modell:

class ExampleStatelessPartitionedModel(custom_model.CustomModel):

  @custom_model.partitioned_inference_api
  def predict(self, input_df: pd.DataFrame) -> pd.DataFrame:
      import xgboost
      # All data in the partition will be loaded in the input dataframe.
      # Construct training data by transforming input_df.
      training_data = # ...

      # Train the model.
      my_model = xgboost.XGBRegressor()
      my_model.fit(training_data)

      # Generate predictions.
      output_df = my_model.predict(...)

      return output_df

my_model = ExampleStatelessPartitionedModel()
Copy

Ein Beispiel für ein zustandsloses partitioniertes Modell, einschließlich Beispieldaten, finden Sie in der Quickstart-Anleitung Partitionierte Modelle.

Zustandsabhängige partitionierte Modelle

Es ist auch möglich, zustandsabhängige partitionierte Modelle zu implementieren, die gespeicherte Teilmodell-Zustände laden. Dazu stellen Sie Modelle im Speicher über snowflake.ml.model.custom_model.ModelContext, oder indem Sie Dateipfade angeben, bereit, die auf angepasste Modellartefakte verweisen und diese während der Inferenz laden.

Das folgende Beispiel zeigt, wie Sie Modelle im Speicher für den Modellkontext bereitstellen.

from snowflake.ml.model import custom_model

# `models` is a dict with model ids as keys, and fitted xgboost models as values.
models = {
  "model1": models[0],
  "model2": models[1],
  ...
}

model_context = custom_model.ModelContext(
  models=models
)
my_stateful_model = MyStatefulCustomModel(model_context=model_context)
Copy

Bei der Protokollierung von my_stateful_model werden die im Kontext bereitgestellten Teilmodelle zusammen mit allen Modelldateien gespeichert. Auf diese kann dann in der Logik der Inferenzmethode zugegriffen werden, indem sie aus dem Kontext abgerufen werden, wie unten gezeigt:

class ExampleStatefulModel(custom_model.CustomModel):

  @custom_model.partitioned_inference_api
  def predict(self, input: pd.DataFrame) -> pd.DataFrame:
    model1 = self.context.model_ref("model1")
    # ... use model1 for inference
Copy

Sie können auch programmgesteuert auf die Modelle zugreifen, indem Sie die Partition ID in der predict-Methode verwenden. Wenn eine Partitionsspalte als Eingabe-Feature bereitgestellt wird, kann sie verwendet werden, um auf ein für die Partition angepasstes Modell zuzugreifen. Wenn die Partitionsspalte zum Beispiel MY_PARTITION_COLUMN ist, kann die folgende Modellklasse definiert werden:

class ExampleStatefulModel(custom_model.CustomModel):

  @custom_model.partitioned_inference_api
  def predict(self, input: pd.DataFrame) -> pd.DataFrame:
    model_id = input["MY_PARTITION_COLUMN"][0]
    model = self.context.model_ref(model_id)
    # ... use model for inference
Copy

In ähnlicher Weise können Teilmodelle als Artefakte gespeichert und zur Laufzeit geladen werden. Diese Vorgehensweise ist nützlich, wenn die Modelle zu groß sind, um in den Speicher zu passen. Zeichenfolgen-Dateipfade für den Modellkontext bereitstellen Die Dateipfade sind während der Inferenz mit self.context.path(artifact_id) zugänglich. Weitere Informationen dazu finden Sie unter Definieren des Modellkontexts durch Schlüsselwortargumente.

Beispiel

Ein Beispiel, einschließlich Beispieldaten, finden Sie in der Quickstart-Anleitung Partitionierte Modelle.

Ein Beispiel für ein zustandsabhängiges partitioniertes benutzerdefiniertes Modell finden Sie in der Quickstart-Anleitung Mehrmodell-Inferenz in Snowflake.