Snowflake Model Registry: Partitioned Custom Models¶
Many datasets can be easily partitioned into multiple independent subsets. For example, a dataset containing sales data for a chain of stores can be partitioned by store number. A separate model can then be trained for each partition. Training and inference operations on the partitions can be parallelized, reducing the wall-clock time for these operations. Furthermore, since individual stores likely differ somewhat in how their features affect their sales, this approach can lead to more accurate inference at the store level.
The Snowflake Model Registry supports distributed processing of training and inference of partitioned data when:
The dataset contains a column that reliably identifies partitions in the data.
The data in each individual partition is uncorrelated with the data in the other partitions and contains enough rows to train the model.
Models may be stateless (training is performed each time inference is called) or stateful (training is performed once before inference and retained for use in multiple inference operations).
With the Snowflake Model Registry, you implement partitioned training and inference using custom models. When using the model, the registry partitions the dataset, fits and predicts the partitions in parallel using all the nodes and cores in your warehouse, and combines the results into a single dataset afterward.
Note
Partitioned training and inference requires Snowpark ML (snowflake-ml-python
package) version 1.5.0 or later.
Defining and logging the custom model¶
The partitoned custom model class inherits from snowflake.ml.model.custom_model.CustomModel
, and inference methods are
declared with the @custom_model.partitioned_inference_api
decorator (Snowpark ML version 1.5.4 or later) or
@custom_model.inference_api
decorator (Snowpark ML version 1.5.0 to 1.5.3). See
Writing the Custom Model Class for information on defining standard custom models.
class ExampleForecastingModel(custom_model.CustomModel):
@custom_model.partitioned_inference_api
def predict(self, input: pd.DataFrame) -> pd.DataFrame:
# All data in the partition will be loaded in the input dataframe.
#… implement model logic here …
return output_df
my_model = ExampleForecastingModel()
When logging the model, provide a function_type
of TABLE_FUNCTION
in the options
dictionary along with any
other options your model requires.
reg = Registry(session=sp_session, database_name="ML", schema_name="REGISTRY")
mv = reg.log_model(my_model,
model_name="my_model",
version_name="v1",
options={"function_type": "TABLE_FUNCTION"}, ###
conda_dependencies=["scikit-learn"],
sample_input_data=train_features
)
If your custom model also has regular (non-table) functions as methods, you can instead use the method_options
dictionary to specify the type of each method.
model_version = reg.log_model(my_model,
model_name="my_model",
version_name="v1",
options={
"method_options": { ###
"METHOD1": {"function_type": "TABLE_FUNCTION"}, ###
"METHOD2": {"function_type": "FUNCTION"} ###
}
}
conda_dependencies=["scikit-learn"],
sample_input_data=train_features
)
Partitioned custom model training¶
In the simplest application of partitioned custom models, training and inference are both done when predict
is
called. The model is fitted, then inference is run, and the fitted model is discarded immediately afterward. This type
of model is called “stateless” because no fit state is stored.
Partitioned custom models can also retain fit state using snowflake.ml.model.custom_model.ModelContext
by
providing models in memory to the model context or by providing a file paths.
The following example shows how to provide models in memory to the model context.
from snowflake.ml.model import custom_model
# `models` is a dict with model ids as keys, and fitted xgboost models as values.
model_context = custom_model.ModelContext(
models=models
)
my_stateful_model = MyStatefulCustomModel(model_context=model_context)
When logging my_stateful_model
, the models provided in the context are stored. They can then be accessed in the
inference method logic by retrieving them from context, as shown below:
class ExampleStatefulModel(custom_model.CustomModel):
@custom_model.partitioned_inference_api
def predict(self, input: pd.DataFrame) -> pd.DataFrame:
model1 = self.context.model_ref("model1")
# ... use model1 for inference
It’s also possible to access the models programmatically by partition ID in the predict
method. If a partition column is
provided as an input feature, it can be used to access a model fitted for the partition. For example, if the partition column
is MY_PARTITION_COLUMN
, the following custom model class can be defined:
class ExampleStatefulModel(custom_model.CustomModel):
@custom_model.partitioned_inference_api
def predict(self, input: pd.DataFrame) -> pd.DataFrame:
model_id = input["MY_PARTITION_COLUMN"][0]
model = self.context.model_ref(model_id)
# ... use model for inference
Similarly, submodels could be stored as artifacts and loaded at runtime. This approach is useful when the models are too large to fit into memory. For more information, see Defining Model Context.
Partitioned custom model inference¶
Use the run
method of a Python ModelVersion
object to invoke the table function methods in a partitioned
fashion, passing partition_column
to specify the name of the column that contains a numeric or string value that
identifies the partition of each record. As usual, you may pass a Snowpark or pandas DataFrame (the latter is useful for
local testing). You will receive the same type of DataFrame as the result. In these examples, inference is partitioned
on a store number.
mv.run(
input_df,
function_name="PREDICT",
partition_column="STORE_NUMBER"
)
You can also call these methods using partitioned data from SQL, as shown here.
SELECT OUTPUT1, OUTPUT2, PARTITION_COLUMN
FROM input_table,
table(
MY_MODEL!PREDICT(input_table.INPUT1, input_table.INPUT2)
OVER (PARTITION BY input_table.STORE_NUMBER)
)
ORDER BY input_table.STORE_NUMBER;
The input data is automatically split among the nodes and cores in your warehouse and the partitions are processed in parallel.
Tip
Many datasets can be partitioned in more than one way. Since the partition column is specified when you call the model, not when you log it, you can easily try out different partitioning schemes without changing the model. For example, in the hypothetical store sales dataset, you could partition by store number or by state or province to see which predicts more effectively.
This also means you don’t need a separate model for unpartitioned processing. If you don’t specify a partition column, no partitioning is done, and all the data is processed together as usual.
Example¶
See the Partitioned Custom Model Quickstart Guide for an example, including sample data.