Snowflake ML Data Connector

Snowflake ML Data Connector를 사용해 Snowflake 테이블 또는 스테이지의 데이터를 Container Runtime 인스턴스(예: 노트북 세션 또는 ML 작업)로 수집하십시오. 데이터 커넥터는 Container Runtime의 분산 처리를 사용하여 데이터 로딩 속도를 높이고 Snowflake Notebooks 또는 ML Jobs에서 ML 파이프라인을 실행하는 효율성을 개선합니다. 로딩한 데이터를 사용하여 Python 기반 ML 워크플로를 Snowflake에서 실행할 수 있습니다. 예를 들어 오픈 소스 패키지를 사용하여 ML 파이프라인을 확장할 수 있습니다. Container Runtime에 대한 자세한 내용은 Container Runtime for ML 섹션을 참조하십시오.

데이터 커넥터를 사용하여 테이블이나 스테이지와 같은 모든 Snowflake 데이터 원본에서 pandas 데이터 프레임으로 데이터를 로드할 수 있습니다. 그런 다음 해당 pandas 데이터프레임을 Snowflake의 오픈 소스 ML 워크플로와 함께 사용할 수 있습니다. 데이터 커넥터는 토치 및 텐서플로 데이터 세트를 생성하는 기능도 제공합니다.

오픈 소스 워크플로를 사용하는 것 외에도 Snowflake에서 배포한 API를 사용하여 대규모로 모델을 학습시키고 튜닝할 수도 있습니다.

데이터 커넥터는 컨테이너 환경 내에서 작동하도록 최적화되어 있습니다. Container Runtime 외부에서 데이터 커넥터는 Apache Arrow 기반 데이터 교환 형식을 사용하여 Snowflake와 컨테이너 간에 데이터를 이동합니다. 동일한 코드가 Snowflake 내부와 외부에서 모두 작동합니다.

Data Connector는 Snowpark DataFrame 또는 Snowflake Dataset와 함께 사용할 수 있습니다. Snowpark DataFrames 에서는 Snowflake 테이블의 데이터에 직접 액세스할 수 있습니다. 개발 중에 사용하는 것이 가장 좋습니다.

Snowflake Datasets는 버전이 지정된 스키마 수준의 오브젝트입니다. 프로덕션 워크플로에 가장 적합합니다. 데이터 세트에 대한 자세한 내용은 Snowflake 데이터 세트 섹션을 참조하십시오.

다음 코드를 사용하여 Snowpark DataFrame 을 Container Runtime으로 가져올 수 있습니다.

connector = DataConnector.from_dataframe(snowpark_df)
Copy

다음 코드를 사용하여 Snowflake 데이터 세트를 Container Runtime으로 가져올 수 있습니다.

connector = DataConnector.from_dataset(snowflake_dataset)
Copy

데이터 커넥터는 분산 처리를 사용하여 pandas 데이터 프레임, PyTorch 데이터 세트 또는 TensorFlow 데이터 세트와 같은 오픈 소스 데이터 오브젝트로의 데이터 로딩을 가속화합니다. to_pandas 가 데이터 프레임에 데이터를 충분히 빠르게 로드하지 못하는 경우 데이터 커넥터를 사용하여 프로세스 속도를 높일 수 있습니다.

데이터 커넥터를 Snowflake 분산 워크플로에 직접 전달하면 워크플로에서 데이터를 사용하는 실행 시간이 향상됩니다. 자세한 내용은 Snowflake ML Container Runtime 참고(Python) 섹션을 참조하십시오.

참고

이 항목에서는 Snowpark ML 모듈이 설치되어 있다고 가정합니다. 설치되어 있지 않으면 로컬에서 Snowflake ML 사용하기 섹션을 참조하십시오.

pandas 데이터 프레임에 대한 Data Connector

다음 코드를 사용하여 Snowflake 테이블의 데이터를 pandas 데이터 프레임으로 로드할 수 있습니다.

from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Specify training table location
table_name = "TRAINING_TABLE"
# Load tabel into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))


import xgboost as xgb

pandas_df = data_connector.to_pandas()
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]

clf = xgb.Classifier()
clf.fit(X, y)
Copy

PyTorch 데이터 세트에 대한 Data Connector

다음 코드는 데이터 커넥터를 사용하여 Snowflake 테이블에서 PyTorch 데이터 세트로 데이터를 로드하는 방법을 보여줍니다.

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']

for batch_idx, batch in enumerate(dataloader):
  y = batch_data.pop(label_col).squeeze()
  X = torch.stack(
      [tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
  )
Copy

TensorFlow 데이터 세트에 대한 Data Connector

TensorFlow 와 함께 사용하려면 to_tf_dataset() 메서드를 사용하여 텐서플로 데이터 세트를 가져옵니다. 데이터 세트를 반복하면 일괄 처리된 TensorFlow 텐서를 얻을 수 있습니다. 데이터 로딩은 효율성을 극대화하기 위해 스트림 방식으로 이루어집니다.

tf_ds = connector.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True
)

for batch in tf_ds:
    print(batch)
Copy

Snowflake의 최적화된 분산 학습 API에 데이터 커넥터 전달하기

최상의 성능을 위해 데이터 커넥터 오브젝트를 Snowflake의 최적화된 분산 학습 API에 전달할 수 있습니다.

다음 코드는 다음을 수행하는 방법을 보여줍니다.

  1. Snowflake 테이블에서 데이터 커넥터 오브젝트로 데이터를 로드합니다.

  2. Snowflake의XGBoost 분류기 인스턴스를 생성합니다.

  3. 데이터 커넥터 오브젝트를 사용하여 분류기를 학습시킵니다.

from sklearn.datasets import make_regression
from snowflake.ml.data.data_connector import DataConnector


from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
  XGBEstimator,
  XGBScalingConfig,
)

from snowflake.snowpark.context import get_active_session

session = get_active_session()

snowflake_df = session.create_dataframe(session.table(table_name))

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
data_connector = DataConnector.from_dataframe(snowflake_df)

snowflake_est = XGBEstimator(
  n_estimators=1,
  objective="reg:squarederror",
  scaling_config=XGBScalingConfig(use_gpu=False),
)

# If you use a data connector, input_cols and label_col must also be provided.
fit_booster = snowflake_est.fit(
  data_connector,
  input_cols = NUMERICAL_COLS,
  label_col = LABEL_COL
)
Copy

PyTorch 배포자와 함께 분할 사용하기

데이터 커넥터를 사용하여 Container Runtime에서 여러 노드에 걸쳐 데이터를 분할할 수 있습니다. Snowflake PyTorch 배포자를 통해 분할을 사용하여 모델을 학습할 수 있습니다. PyTorch 배포자에 대한 자세한 내용은 PyTorch 배포자 섹션을 참조하십시오.

다음 코드는 데이터 커넥터를 사용하여 자릿수 데이터 세트에 대해 PyTorch 모델을 학습시킵니다. 노드에서 4개의 프로세스가 실행되는 하나의 노드를 지정합니다. 이 코드는 PyTorch 모델, 학습 함수, PyTorch 트레이너를 정의합니다. 학습 함수는 여러 노드에서 실행되며 각 프로세스는 고유한 데이터 샤드를 수신합니다. 이 예에서는 GPU를 사용하지 않았지만 num_gpus 매개 변수의 값을 사용 중인 GPU의 수로 설정할 수 있습니다.

from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector

from snowflake.ml.modeling.pytorch import (
  PyTorchTrainer,
  ScalingConfig,
  WorkerResourceConfig,
  getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session

session = get_active_session()

# Create the Snowflake data. We'll start with a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)

# Create the data connector from a Snowpark dataframe.
# Data connectors can also be created from Snowflake Datasets.
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)

# Define the model. This is OSS PyTorch code.
class DigitsModel(nn.Module):
    def __init__(self):
        super(DigitsModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(8 * 8, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# Define the training function that will run across multiple nodes or devices.
# Each train_func process will receive a unique data shard.
def train_func():
    import os
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader
    from torch import nn
    from torch.nn.parallel import DistributedDataParallel as DDP

    # Context provides relevant information to training process, like data shards and model directory.
    context = getContext()
    dataset_map = context.get_dataset_map()
    model_dir = context.get_model_dir()
    training_data = dataset_map["train"].get_shard().to_torch_dataset()
    train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)

    # The rest of this code is OSS pytorch code.
    dist.init_process_group()
    device = "cpu"
    label_col = '"target"'
    batch_size = 64

    model = DDP(DigitsModel())

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    # Training loop
    for epoch in range(5):
        for batch, batch_data in enumerate(train_dataloader):
            y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
            X = torch.concat(
                [tensor.to(torch.float32) for tensor in batch_data.values()],
                dim=-1,
          )  .to(device)
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")

    # Save the model
    if dist.get_rank() == 0:
        torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))


pytroch_trainer = PyTorchTrainer(
    train_func=train_func,
    scaling_config=ScalingConfig(
        num_nodes=1,
        num_workers_per_node=4,
        resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
    ),
)
response = pytroch_trainer.run(
    dataset_map=dict(
        train=sharded_data_connector,
    )
)
Copy

앞의 코드에서 API를 여러 번 사용했습니다. 자세한 내용은 Snowflake ML Container Runtime 참고(Python) 섹션을 참조하십시오.