데이터 로드 및 쓰기¶
Snowflake ML을 사용하여 Snowflake 테이블과 스테이지에서 머신 러닝 워크플로로 데이터를 효율적으로 로드할 수 있습니다. Snowflake ML은 Snowflake의 분산 처리를 활용하여 학습 및 추론 워크플로를 위한 데이터 수집을 가속화하는 최적화된 데이터 로드 기능을 제공합니다.
다음을 사용하여 데이터를 로드하고 처리할 수 있습니다.
Snowflake Notebooks: 데이터를 탐색하고 ML 모델을 빌드하기 위한 대화형 개발 환경입니다. 자세한 내용은 Container Runtime for ML의 Notebooks 섹션을 참조하십시오.
Snowflake ML Jobs: 모든 개발 환경에서 비동기적으로 ML 워크로드를 실행합니다. 자세한 내용은 Snowflake ML 작업 섹션을 참조하십시오.
Notebooks 및 ML Jobs는 분산 처리 기능으로 머신 러닝 워크로드에 맞게 최적화된 미리 구성된 환경을 제공하는 ML용 Container Runtime에서 실행됩니다. Container Runtime은 분산 컴퓨팅을 위한 오픈 소스 프레임워크인 Ray를 사용하여 여러 컴퓨팅 노드에서 데이터를 효율적으로 처리합니다. ML용 Container Runtime에 대한 자세한 내용은 Container Runtime for ML 섹션을 참조하세요.
Snowflake ML은 정형 데이터와 비정형 데이터를 로드하기 위한 여러 다른 APIs를 제공합니다.
정형 데이터(테이블 및 데이터 세트)
DataConnector: Snowflake 테이블 및 Snowflake 데이터 세트에서 데이터를 로드합니다. 자세한 내용은 Snowflake 테이블에서 정형 데이터 로드 섹션을 참조하십시오.
DataSink: Snowflake 테이블에 데이터를 다시 씁니다. 자세한 내용은 정형 데이터를 Snowflake 테이블에 다시 씁니다. 섹션을 참조하십시오.
비정형 데이터(스테이지의 파일)
DataSource APIs: Snowflake 스테이지에서 다양한 파일 형식(CSV, Parquet, 이미지 등)의 데이터를 로드합니다. 자세한 내용은 Snowflake 스테이지에서 비정형 데이터 로드 섹션을 참조하십시오.
다음 테이블은 사용 사례에 맞는 올바른 API를 선택하는 데 도움이 될 수 있습니다.
데이터 타입 |
데이터 원본 |
로딩을 위한 API |
쓰기를 위한 API |
|---|---|---|---|
정형 |
Snowflake 테이블 |
DataConnector |
DataSink |
정형 |
Snowflake 데이터 세트 |
DataConnector |
DataSink |
비정형 |
CSV 파일(스테이지) |
DataSource API |
N/A |
비정형 |
Parquet 파일(스테이지) |
DataSource API |
N/A |
비정형 |
기타 스테이징된 파일 |
DataSource API |
N/A |
Snowflake 테이블에서 정형 데이터 로드¶
Snowflake DataConnector를 사용하여 Snowflake 테이블 및 Snowflake 데이터 세트의 정형 데이터를 Snowflake Notebook 또는 Snowflake ML Job으로 로드합니다. DataConnector는 여러 컴퓨팅 노드에서 읽기를 병렬화하여 데이터 로딩을 가속화합니다.
DataConnector는 Snowpark DataFrames 또는 Snowflake 데이터 세트에 작동합니다.
Snowpark DataFrames: Snowflake 테이블의 데이터에 대한 직접 액세스를 제공합니다. 개발 중에 사용하는 것이 가장 좋습니다.
Snowflake 데이터 세트: 버전이 지정된 스키마 수준 오브젝트입니다. 프로덕션 워크플로에 사용하는 것이 가장 좋습니다. 자세한 내용은 Snowflake 데이터 세트 섹션을 참조하십시오.
읽기를 병렬 처리한 후 DataConnector는 데이터를 다음 데이터 구조 중 하나로 변환할 수 있습니다.
pandas 데이터 프레임
PyTorch 데이터 세트
TensorFlow 데이터 세트
다음 경우에 DataConnector를 만듭니다.¶
Snowpark DataFrame 또는 Snowflake 데이터 세트에서 DataConnector를 만들 수 있습니다.
다음 코드를 사용하여 Snowpark DataFrame에서 DataConnector를 만듭니다.
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowflake table
data_connector = DataConnector.from_dataframe(session.table("example-table-name"))
다음 코드를 사용하여 Snowpark 데이터 세트에서 DataConnector를 만듭니다.
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector from a Snowflake Dataset
data_connector = DataConnector.from_dataset(snowflake_dataset)
DataConnector를 다른 형식으로 변환¶
DataConnector를 만든 후 다양한 ML 프레임워크에서 사용할 수 있도록 다른 데이터 구조로 변환할 수 있습니다.
scikit-learn 및 기타 pandas 호환 라이브러리에서 사용할 수 있도록 DataConnector를 pandas 데이터프레임으로 변환할 수 있습니다.
다음 예에서는 Snowflake 테이블의 데이터를 pandas 데이터프레임으로 로드하고 XGBoost 분류자를 학습시킵니다.
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
import xgboost as xgb
session = get_active_session()
# Specify training table location
table_name = "TRAINING_TABLE"
# Load table into DataConnector
data_connector = DataConnector.from_dataframe(session.table(table_name))
# Convert to pandas dataframe
pandas_df = data_connector.to_pandas()
# Prepare features and labels
label_column_name = 'TARGET'
X, y = pandas_df.drop(label_column_name, axis=1), pandas_df[label_column_name]
# Train classifier
clf = xgb.Classifier()
clf.fit(X, y)
PyTorch 모델 및 데이터 로더에서 사용할 수 있도록 DataConnector를 PyTorch 데이터 세트로 변환할 수 있습니다.
다음 예에서는 Snowflake 테이블의 데이터를 PyTorch 데이터 세트로 로드합니다.
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to PyTorch dataset
torch_dataset = data_connector.to_torch_dataset(batch_size=32)
dataloader = DataLoader(torch_dataset, batch_size=None)
label_col = 'TARGET'
feature_cols = ['FEATURE1', 'FEATURE2']
for batch_idx, batch in enumerate(dataloader):
y = batch_data.pop(label_col).squeeze()
X = torch.stack(
[tensor.squeeze() for key, tensor in batch.items() if key in feature_cols]
)
TensorFlow 모델에서 사용할 수 있도록 DataConnector를 TensorFlow 데이터 세트로 변환할 수 있습니다. 데이터 로딩은 효율성을 극대화하기 위해 스트림 방식으로 이루어집니다.
다음 예에서는 DataConnector를 TensorFlow 데이터 세트로 변환합니다.
from snowflake.ml.data.data_connector import DataConnector
# Create DataConnector (see previous examples)
# data_connector = DataConnector.from_dataframe(...)
# Convert to TensorFlow dataset
tf_ds = data_connector.to_tf_dataset(
batch_size=4,
shuffle=True,
drop_last_batch=True
)
for batch in tf_ds:
print(batch)
Snowflake의 분산 학습 APIs에서 사용¶
최상의 성능을 위해 DataConnector를 먼저 pandas, PyTorch 또는 TensorFlow 데이터 세트로 변환하는 대신, Snowflake의 최적화된 분산 학습 APIs로 직접 전달할 수 있습니다.
다음 예는 Snowflake의 분산 XGBoost 추정기를 사용하여 XGBoost 모델을 훈련시킵니다.
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import (
XGBEstimator,
XGBScalingConfig,
)
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create DataConnector from a Snowpark dataframe
snowflake_df = session.table("TRAINING_TABLE")
data_connector = DataConnector.from_dataframe(snowflake_df)
# Create Snowflake XGBoost estimator
snowflake_est = XGBEstimator(
n_estimators=1,
objective="reg:squarederror",
scaling_config=XGBScalingConfig(use_gpu=False),
)
# Train using the data connector
# When using a data connector, input_cols and label_col must be provided
fit_booster = snowflake_est.fit(
data_connector,
input_cols=NUMERICAL_COLS,
label_col=LABEL_COL
)
PyTorch 배포자에서 분할 사용¶
ShardedDataConnector를 사용하여 Snowflake PyTorch 배포자를 통한 분산 학습을 위해 여러 노드에 데이터를 분할할 수 있습니다.
다음 예는 여러 프로세스에서 분할된 데이터를 사용하여 PyTorch 모델에 숫자 데이터 세트를 학습시킵니다.
from sklearn import datasets
from snowflake.ml.data.sharded_data_connector import ShardedDataConnector
from snowflake.ml.modeling.pytorch import (
PyTorchTrainer,
ScalingConfig,
WorkerResourceConfig,
getContext,
)
from torch import nn
from snowflake.snowpark.context import get_active_session
session = get_active_session()
# Create the Snowflake data from a Snowpark dataframe
digits = datasets.load_digits(as_frame=True).frame
digits_df = session.create_dataframe(digits)
# Create sharded data connector
sharded_data_connector = ShardedDataConnector.from_dataframe(digits_df)
# Define the PyTorch model
class DigitsModel(nn.Module):
def __init__(self):
super(DigitsModel, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(8 * 8, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# Define training function that runs across multiple nodes or devices
# Each process receives a unique data shard
def train_func():
import os
import torch
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch import nn
from torch.nn.parallel import DistributedDataParallel as DDP
# Get context with data shards and model directory
context = getContext()
dataset_map = context.get_dataset_map()
model_dir = context.get_model_dir()
training_data = dataset_map["train"].get_shard().to_torch_dataset()
train_dataloader = DataLoader(training_data, batch_size=batch_size, drop_last=True)
dist.init_process_group()
device = "cpu"
label_col = '"target"'
batch_size = 64
model = DDP(DigitsModel())
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# Training loop
for epoch in range(5):
for batch, batch_data in enumerate(train_dataloader):
y = batch_data.pop(label_col).flatten().type(torch.LongTensor).to(device)
X = torch.concat(
[tensor.to(torch.float32) for tensor in batch_data.values()],
dim=-1,
).to(device)
pred = model(X)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
print(f"Epoch {epoch}, Batch {batch}, Loss: {loss.item()}")
# Save the model
if dist.get_rank() == 0:
torch.save(model.state_dict(), os.path.join(model_dir, "digits_model.pth"))
# Create PyTorch trainer with scaling configuration
pytorch_trainer = PyTorchTrainer(
train_func=train_func,
scaling_config=ScalingConfig(
num_nodes=1,
num_workers_per_node=4,
resource_requirements_per_worker=WorkerResourceConfig(num_cpus=1, num_gpus=0),
),
)
# Run distributed training
response = pytorch_trainer.run(
dataset_map=dict(
train=sharded_data_connector,
)
)
Snowflake 스테이지에서 비정형 데이터 로드¶
Snowflake DataSource APIs를 사용하여 Snowflake 스테이지에서 비정형 데이터를 읽습니다. 각 파일 형식에는 데이터를 읽는 방법을 정의하는 해당 데이터 소스 클래스가 있습니다.
다음은 파일 형식과 데이터를 로드하는 데 사용하는 해당 APIs를 보여줍니다.
이진 파일:
SFStageBinaryFileDataSource텍스트 파일:
SFStageTextDataSourceCSV 파일:
SFStageCSVDataSourceParquet 파일:
SFStageParquetDataSource이미지 파일:
SFStageImageDataSource
데이터 로드 및 처리¶
Snowflake 데이터 소스를 생성할 때 다음을 제공해야 합니다.
데이터를 읽는 원본 스테이지의 이름
스테이지가 있는 데이터베이스(기본값은 현재 세션)
스테이지가 있는 스키마(기본값은 현재 세션)
데이터 소스에서 읽는 필터 파일의 패턴(선택 사항)
데이터 API 또는 데이터 커넥터는 제공된 경로 내에서 파일 패턴과 일치하는 모든 파일을 검색합니다.
Snowflake 데이터 소스를 정의한 후 데이터를 Ray 데이터 세트에 로드할 수 있습니다. Ray 데이터 세트를 사용하면 다음을 수행할 수 있습니다.
Ray APIs에서 데이터 세트 사용
DataConnector에 데이터 세트 전달
필요한 경우 pandas 또는 PyTorch 데이터 세트로 변환합니다.
다음 예에서는 다음을 수행합니다.
Snowflake 스테이지에서 Parquet 파일을 Ray 데이터 세트로 읽어옵니다.
데이터 세트를 DataConnector로 변환합니다.
import ray
from snowflake.ml.ray.datasource.stage_parquet_file_datasource import SFStageParquetDataSource
from snowflake.ml.data.data_connector import DataConnector
data_source = SFStageParquetDataSource(
stage_location="@stage/path/",
database="DB_NAME", # optional
schema="SCHEMA_NAME", # optional
file_pattern='*.parquet', # optional
)
# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)
dc = DataConnector.from_ray_dataset(ray_ds)
정형 데이터를 Snowflake 테이블에 다시 씁니다.¶
Snowflake DataSink API를 사용하여 Notebook 또는 ML Job에서 Snowflake 테이블로 정형 데이터를 다시 씁니다. 추가 분석 또는 저장을 위해 변환된 데이터 세트 또는 예측 데이터 세트를 Snowflake에 쓸 수 있습니다.
데이터 싱크를 정의하려면 다음을 제공합니다.
스테이지 이름
데이터베이스 이름(기본값은 현재 세션)
스키마 이름(기본값은 현재 세션)
특정 파일과 일치시킬 파일 패턴(선택 사항)
다음 예에서는 데이터 싱크를 정의합니다.
from snowflake.ml.ray.datasink import SnowflakeTableDatasink
datasink = SnowflakeTableDatasink(
table_name="table_name",
database="db_name",
schema="schema_name",
auto_create_table=True, # create table if not exists
override=True # replace vs insert to table
)
데이터 싱크를 정의한 후 다음 코드를 사용하여 Ray 데이터 세트를 Snowflake 테이블에 쓸 수 있습니다.
import ray
# Get Ray dataset from sources
ray_ds = ray.data.read_datasource(data_source)
# Setup transform operations, not executed yet
transformed_ds = ray_ds.map_batches(example_transform_batch_function)
# Start writing to Snowflake distributedly
transformed_ds.write_datasink(datasink)
모범 사례 및 고려 사항¶
최적의 성능과 리소스 활용을 위해 다음 모범 사례를 고려하세요.
병렬 처리: Ray의 분산 특성을 활용하도록 데이터 소스 구현을 설계합니다. 사용 사례에 더 적합하도록 병렬 처리 및 동시성 인자를 사용자 지정합니다. 각 단계에서 작업당 할당할 리소스 수를 수동으로 정의할 수 있습니다.
분할: 기본적으로, Ray의 내부 논리는 리소스와 데이터 크기를 기준으로 데이터 세트를 분할합니다. ``ray_ds.repartition(X)``의 사용 사례에 따라 파티션 수를 사용자 지정하여 많은 수의 작은 작업과 적은 수의 큰 작업 중에서 선택할 수 있습니다.
모범 사례: 추가 지침은 `Ray 데이터 사용자 가이드<https://docs.ray.io/en/latest/data/user-guide.html>`_를 따르세요.
Ray API 세부 정보:
다음 단계¶
데이터를 로드한 후 다음을 수행할 수 있습니다.
기능 관리를 위한 기능 스토어 사용