パイプラインを作成してデプロイする¶
概要¶
機械学習( ML )のワークフローには通常、いくつかの重要な段階があります。
データ探索と準備:この初期フェーズでは、生のデータを理解し、クリーニングし、欠けている値を処理し、使用可能な形式に変換します。
データエンジニアリング:ここで、生のデータは、予測モデルにとって根本的な問題をより適切に表す機能に変換されます。これには、多くの場合、スケーリング、エンコード、既存の機能から新しい機能を作成するなどの手法が含まれます。
モデル開発:この段階では、様々な ML モデルが選択され、準備されたデータでトレーニングされ、そのパフォーマンスを最適化するためにチューニングされます。開発されたモデルは、その精度、公平性、汎化能力を評価するために、適切なメトリックを用いて厳密に評価されます。
モデルのデプロイ:実稼働への準備が整ったモデルはモデルレジストリに保存され、その後、新しいデータに対するバッチまたはリアルタイムの予測のためにデプロイされます。
ML モデルの初期開発では、多くの場合、データサイエンティストがさまざまなアルゴリズムや機能を迅速に試すことができる、アジャイルで反復的なアプローチが有効です。しかし、モデルが成熟し、価値を実証するにつれて、焦点は運用化へと移り、そこではパイプラインが CI/CD (継続的統合/継続的デリバリー)により強化され、自動化されます。この自動化により、コード、データパイプライン、またはモデルへの変更が一貫してビルド、テスト、およびデプロイされ、より信頼性が高く、効率的で、保守性の高い ML システムにつながります。
開発¶
ローカルの IDE (例: VS Code)または対話型ノートブック(Snowflake NotebookまたはJupyter)での対話型開発から始めましょう。入力(テーブル、ステージ、ハイパーパラメーター)をパラメーター化し、移植性のためにステップをモジュール化しておきます。たとえば、データ準備用に1つのセル/関数、機能エンジニアリング用に別のセル/関数、モデルトレーニング用に別のセル/関数などを用意しておくと便利です。
Snowflakeは、機械学習のライフサイクルの各段階に対して次のツールを提供しています。
ステージ |
ツール |
使用状況 |
|---|---|---|
データ探索 |
Snowflake Notebooks |
管理されたブラウザベースのノートブック環境で開発します。Pythonと SQL を1つの場所で使用し、データセットのプロファイリング、分布の視覚化、迅速な反復を行います。 |
Snowpark DataFrames |
Snowflakeに計算をプッシュダウンする使い慣れた DataFrame APIs を使用します。 |
|
データエンジニアリング |
Snowpark DataFrames |
SQL/Python/Scalaを使用し、プッシュダウン最適化により、ウェアハウス規模で再現可能な変換を構築します。 |
UDFs/UDTFs |
カスタムPythonロジックを関数やテーブル関数としてカプセル化し、チームやパイプライン間で複雑な変換を再利用できます。 |
|
Feature Store |
特定の時点における正確性を維持して機能を定義、登録、提供し、モデル間で再利用します。一貫性のあるオフライントレーニングセットと低レイテンシのオンライン検索をサポートし、リークと重複を削減します。 |
|
モデルトレーニング |
Snowflake Notebooks |
scikit-learn、 XGBoost 、 PyTorch などのおなじみのオープンソースライブラリを使用して、Snowflake Notebooksで ML モデルをトレーニングします。弾力的なスケールを活用し、データの移動を回避して、モデルと前処理を1つの場所で永続化します。 |
MLジョブ |
高メモリインスタンス、 GPU アクセラレーションや、ローカル IDEs 、ノートブック、外部ホストのオーケストレーターなどあらゆる環境からの分散処理など、リソース集約的なステップを特化したコンピュートオプションにオフロードします。 |
|
モデルのデプロイ |
モデルレジストリ |
系統とガバナンス制御を使用してモデルを登録およびバージョン管理します。ディスカバリーを一元化し、安全なプロモーションワークフロー、監査、ロールバックを促進します。 |
バッチ推論 |
Pythonまたは SQL から登録されたモデルを提供し、管理されたデータに近い推論を維持し、一貫したレジストリに基づく実行で操作を簡素化します。 |
|
リアルタイム推論 |
登録されたモデルを、自動スケーリング機能を備えたマネージド HTTPS エンドポイントにデプロイします。サービス提供インフラストラクチャが不要になり、Snowflakeの認証およびガバナンスと統合されたシンプルで安全な低レイテンシの推論を実現します。 |
|
モデルモニタリング |
モデルのバージョンごとにモニターを作成して推論ログを実体化し、毎日のメトリックを自動的に更新して、Snowsightのドリフト、パフォーマンス、統計的シグナルを表示します。アラートとカスタムダッシュボードを構成してバージョンを比較し、データやパイプラインの問題を迅速に診断します。 |
|
ワークフローのオーケストレーション |
スケジュールされたノートブック |
Snowflake Notebooksがスケジュール上で非インタラクティブに実行されるようにパラメーター化し、構成します。 |
タスクグラフ |
ML パイプラインを有向非巡回グラフ( DAG )に運用化し、スケジュールまたはイベントベースのトリガーによって実行するように構成します。 |
|
セキュリティおよびガバナンス |
RBAC 、タグ、マスキング、ポリシー |
ロールベースのアクセス、データ分類、マスキング/行ポリシーをトレーニングデータ、機能、モデルに適用します。ML ライフサイクル全体を通じて、最小権限のアクセスおよびコンプライアンスを保証します。 |
実稼働環境への準備¶
コードの準備¶
パイプラインを運用開始する前に、コードを実稼働環境向けに準備します。ノートブックから始める場合は、まずコードをモジュール式の再利用可能な関数に再構築します。これにより、各主要ステップ(データ準備、機能エンジニアリング、モデルトレーニング、評価)が明確な入力と出力を持つ個別の関数になります。すでにモジュール化されたスクリプトを使用している場合は、各関数のインターフェースと責務が明確に定義されていることを確認します。テーブル名やハイパーパラメーターなど、すべての設定値をパラメーター化し、環境間でのデプロイを可能にします。また、デバッグや将来の開発のために、エンドツーエンドのパイプラインをローカルで実行するエントリポイントスクリプトを作成することをお勧めします。
ディレクトリ構造の例:
ml_pipeline_project/
├── README.md
├── requirements.txt
├── config/
├── src/ml_pipeline/
│ ├── utils/ # Common utilities
│ ├── data/ # Data preparation
│ ├── features/ # Feature engineering
│ ├── models/ # Model training
│ └── inference/ # Model inference
├── scripts/
│ ├── run_pipeline.py # Main entry point
│ └── dag.py
├── tests/
└── notebooks/
run_pipeline.pyスクリプトの例:
import argparse
from ml_pipeline.utils.config_loader import load_config
from ml_pipeline.data.ingestion import load_raw_data
from ml_pipeline.data.validation import validate_data_quality
from ml_pipeline.features.transformers import create_features
from ml_pipeline.models.training import train_model
from ml_pipeline.models.evaluation import evaluate_model
from ml_pipeline.models.registry import register_model
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--config", required=True, help="Config file path")
parser.add_argument("--env", default="dev", help="Environment (dev/prod)")
args = parser.parse_args()
# Load configuration
config = load_config(args.config, args.env)
# Execute pipeline stages
raw_data = load_raw_data(config.data.source_table)
validate_data_quality(raw_data, config.data.quality_checks)
features = create_features(raw_data, config.features.transformations)
model = train_model(features, config.model.hyperparameters)
metrics = evaluate_model(model, features, config.model.eval_metrics)
register_model(model, metrics, config.model.registry_name)
if __name__ == "__main__":
main()
Notebooksから ML Jobsへの移行¶
Snowflake Notebooksで書かれたほとんどのコードは、 ML Jobsでコードを変更することなく動作します。注意すべき点は次のとおりです。
**ランタイム APIs **
特定の分散 ML APIs は、Container Runtimeの内部でのみ利用可能であり、Container Runtime環境の外部でそれらをインポートしようとすると失敗します。これらの APIs は ML Jobs内で利用できますが、 ML ジョブペイロードの中でインポートする必要があります。
# Attempting to import distributed runtime APIs in local/external
# environments will fail!
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator
from snowflake.ml.jobs import remote
@remote(...)
def my_remote_function(...):
# Move imports *inside* your ML Job payloads
from snowflake.ml.modeling.distributors.xgboost import XGBEstimator # This works!
...
job = my_remote_function() # Start ML Job
job.wait() # Wait for job to complete
クラスターのスケーリング
scale_cluster() API はNotebooks内でのみ動作し、 ML Jobs内では動作しません。代わりに、ジョブ投入時に目的のクラスターサイズを指定します。詳細については、 Snowflakeマルチノード ML ジョブ をご参照ください。
from snowflake.ml.jobs import remote
@remote(..., target_instances=4)
def my_remote_function(...):
# 4-node cluster will be provisioned for distributed processing
# inside this job. The cluster will be automatically cleaned up on
# job termination.
パイプラインのオーケストレーション¶
エンドツーエンドのパイプラインを準備したら、Snowflake Task Graphs、Scheduled Notebooksなどのオーケストレーター、またはAirflowのような外部のオーケストレーターを使用してパイプラインを運用化します。オーケストレーションフレームワークを使うことで、いくつかの重要な利点が得られます。
自動再試行と障害分離によるフォールトトレランスと信頼性
実行履歴、リアルタイムステータス、アラートによる可観測性
複雑な依存関係グラフと様々なトリガーに対するスケジューリングと調整
バージョン管理統合と構成管理による衛生管理
Snowflake ML は、Airflow、Dagster、Prefectを含むほとんどのオーケストレーションフレームワークと互換性があります。既存のワークフローや DAG のセットアップがすでにある場合は、既存のワークフローをSnowflake ML の機能と統合し、計算またはデータ集約的なステップを ML Jobsまたは UDFs にオフロードすることをお勧めします。既存の DAG セットアップがない場合は、SnowflakeネイティブソリューションのSnowflake Task Graphsを使用できます。
Snowflake上で DAG を使用してオーケストレーションを設定するには、次の高レベルのステップに従います。
コードの準備 に従ってローカルパイプラインコードを準備する
DAG の定義を保存するために、新しい
dag.pyファイル(または他の名前)を作成するこのガイドに従って、パイプラインの DAG フォームを実装する
dag.pyスクリプトを実行して、Task GraphをSnowflakeアカウントにデプロイする
Tip
Task Graphスクリプトを実行しても、必ずしもグラフが実行されるわけではありません。基本的なTask Graphスクリプトは、単にTask Graphを定義してデプロイします。Task Graphの実行は、手動またはスケジュールによって、別途トリガーされる必要があります。
開発環境と実稼働環境の分離¶
開発( DEV )環境と実稼働( PROD )環境を分離できるよう、 DAG スクリプトをパラメーター化することをお勧めします。これを実現するには、Snowflakeの接続管理、アプリケーション固有の構成、またはその2つの組み合わせを使用できます。必要な分離のレベルはガバナンス要件によって異なりますが、一般的には、 DEV と PROD に別々のデータベースを使用することをお勧めします。 PROD データベースは、管理者と特別なサービスアカウントへのアクセスを制限する RBAC ポリシーによって保護されます。
CI/CD¶
Azure Pipelinesや GitHub Actionsなどの CI/CD パイプラインを使用して、パイプラインの検証とデプロイを自動化できます。一般的には、 PROD にデプロイする前に、 DEV または STAGING 環境でテストすることをお勧めします。ベストプラクティスは、実稼働ブランチにマージする前に、 DEV でコードの変更を検証するマージゲートをソースコントロールリポジトリに設定することです。実稼働ブランチへの変更は、 PROD に継続的に(つまり、変更ごとに)デプロイすることも、定期的に(毎日や毎週)デプロイすることもできます。ベストプラクティスは、 PROD に変更をデプロイする前に、 DEV または STAGING 環境で実稼働ブランチの状態の最終検証を実行することです。GitHub ActionsのDeployments and Environmentsなどのプラットフォーム機能を使用して、各デプロイ環境への接続を定義および構成します。CI/CD パイプラインを構成して、次のような変更をデプロイ環境にプッシュします。
(オプション)ライブラリとモジュールをPythonパッケージとしてビルドし、独自のパッケージフィードにプッシュする
(オプション)Snowflakeステージにファイルをアップロードする
これは、
snowflake.ml.jobs.submit_from_stage()をパイプラインで使用する場合によく必要となりますまたは、Snowflakeの GitHub 統合を使って、 GitHub リポジトリをSnowflakeステージとして直接追跡することもできます
dag.pyを実行し、構成された環境にTask Graphをデプロイする(オプション)新しくデプロイされたTask Graphの実行をトリガーして監視し、有効性を確認する