Snowpark ML FileSystem und FileSet — Veraltet

Die Snowpark ML-Bibliothek enthält FileSystem, eine Abstraktion, die einem Dateisystem für einen internen, serverseitig verschlüsselten Snowflake-Stagingbereich ähnelt. Genauer gesagt, handelt es sich um eine fsspec AbstractFileSystem-Implementierung. Die Bibliothek enthält auch die verwandte Klasse FileSet, mit der Sie die Daten des maschinellen Lernens aus einer Snowflake-Tabelle in den Stagingbereich verschieben und von dort PyTorch oder TensorFlow zuführen können (siehe Snowpark ML Data Connector).

Tipp

Die meisten Benutzer sollten die neuere Dataset-API verwenden, um unveränderliche, geregelte Daten-Snapshots in Snowflake zu erstellen und sie in End-to-End-Workflows für maschinelles Lernen zu verwenden.

Installation

FileSystem- und FileSet-APIs sind Teil des Snowpark ML-Python-Pakets snowflake-ml-python. Eine Installationsanleitung finden Sie unter Lokale Verwendung von Snowflake ML.

Erstellen und Verwenden eines Dateisystems

Das Erstellen eines Snowpark ML-Dateisystems erfordert entweder ein Snowflake-Konnektor für Python-Connection-Objekt oder ein Snowpark Python-Session-Objekt. Eine Anleitung dazu finden Sie unter Verbinden mit Snowflake.

Nachdem Sie entweder eine Verbindung oder eine Sitzung eingerichtet haben, können Sie eine Snowpark ML-SFFileSystem-Instanz erstellen, über die Sie auf Daten in Ihrem internen Stagingbereich zugreifen können.

Wenn Sie eine Verbindung zum Snowflake-Konnektor für Python haben, übergeben Sie diese als sf_connection-Argument:

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs1 = sfcfs.SFFileSystem(sf_connection=sf_connection)
Copy

Wenn Sie eine Snowpark Python-Sitzung haben, übergeben Sie diese als snowpark_session-Argument:

import fsspec
from snowflake.ml.fileset import sfcfs

sf_fs2 = sfcfs.SFFileSystem(snowpark_session=sp_session)
Copy

SFFileSystem erbt viele Features von fsspec.FileSystem, wie z. B. das lokale Caching von Dateien. Sie können dieses und andere Features aktivieren, indem Sie ein Snowflake-Dateisystem über die fsspec.filesystem factory-Funktion instanziieren und target_protocol="sfc" übergeben, um die Snowflake-FileSystem-Implementierung zu verwenden:

local_cache_path = "/tmp/sf_files/"
cached_fs = fsspec.filesystem("cached", target_protocol="sfc",
                    target_options={"sf_connection": sf_connection,
                                    "cache_types": "bytes",
                                    "block_size": 32 * 2**20},
                    cache_storage=local_cache_path)
Copy

Das Snowflake-Dateisystem unterstützt die meisten Nur-Lese-Methoden, die für ein fsspec-FileSystem definiert sind, einschließlich find, info, isdir, isfile und exists.

Spezifizieren der Dateien

Um die im Stagingbereich befindlichen Dateien anzugeben, verwenden Sie einen Pfad im Format @database.schema.stage/file_path.

Auflisten der Dateien

Die Methode ls des Dateisystems wird verwendet, um eine Liste der Dateien im Stagingbereich zu erhalten:

print(*cached_fs.ls("@ML_DATASETS.public.my_models/sales_predict/"), end='\n')
Copy

Öffnen und Lesen von Dateien

Sie können Dateien im Stagingbereich öffnen, indem Sie die Methode open des Dateisystems verwenden. Sie können die Dateien dann mit denselben Methoden lesen, die Sie auch für normale Python-Dateien verwenden. Das Dateiobjekt ist auch ein Kontextmanager, der mit der Python-Anweisung with verwendet werden kann, sodass es automatisch geschlossen wird, wenn es nicht mehr benötigt wird.

path = '@ML_DATASETS.public.my_models/test/data_7_7_3.snappy.parquet'

with sf_fs1.open(path, mode='rb') as f:
    print(f.read(16))
Copy

Sie können die SFFileSystem-Instanz auch mit anderen Komponenten verwenden, die fsspec-Dateisysteme akzeptieren. Hier wird die im vorherigen Codeblock erwähnte Parquet-Datendatei an die PyArrow-Methode read_table übergeben:

import pyarrow.parquet as pq

table = pq.read_table(path, filesystem=sf_fs1)
table.take([1, 3])
Copy

Python-Komponenten, die Dateien (oder dateiähnliche Objekte) akzeptieren, können an ein vom Snowflake-Dateisystem geöffnetes Dateiobjekte übergeben werden. Wenn Sie in Ihrem Stagingbereich zum Beispiel eine gzip-komprimierte Datei haben, können Sie diese mit dem Python-Modul gzip verwenden, indem Sie die Datei als fileobj-Parameter an gzip.GzipFile übergeben:

path = "sfc://@ML_DATASETS.public.my_models/dataset.csv.gz"

with cached_fs.open(path, mode='rb', sf_connection=sf_connection) as f:
    g = gzip.GzipFile(fileobj=f)
    for i in range(3):
        print(g.readline())
Copy

Erstellen und Verwenden eines FileSet

Ein Snowflake-FileSet ein unveränderlicher Snapshot eines SQL-Abfrageergebnisses in Form von Dateien in einem internen, serverseitigen verschlüsselten Stagingbereich. Auf diese Dateien kann über ein FileSystem zugegriffen werden, um die Daten Tools wie PyTorch und TensorFlow zuzuführen, sodass Sie Modelle in großem Umfang und innerhalb Ihres bestehenden Data Governance-Modells trainieren können. Um ein FileSet zu erstellen, verwenden Sie die Methode FileSet.make.

Zum Erstellen eines FileSet benötigen Sie eine Snowflake-Python-Verbindung oder eine Snowpark-Sitzung. Eine Anleitung dazu finden Sie unter Verbinden mit Snowflake. Sie müssen auch den Pfad zu einem bestehenden internen, serverseitig verschlüsselten Stagingbereich oder zu einem Unterverzeichnis in einem solchen Stagingbereich angeben, in dem das FileSet gespeichert wird.

Um ein FileSet aus einem Snowpark-DataFrame zu erstellen, müssen Sie einen DataFrame erstellen und diesen an FileSet.make als snowpark_dataframe übergeben. Rufen Sie nicht die DataFrame-Methode collect auf:

# Snowpark Python equivalent of "SELECT * FROM MYDATA LIMIT 5000000"
df = snowpark_session.table('mydata').limit(5000000)
fileset_df = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_dataframe",
    snowpark_dataframe=df,
    shuffle=True,
)
Copy

Um ein FileSet mit einer Verbindung über den Snowflake-Konnektor für Python zu erstellen, übergeben Sie die Verbindung an Fileset.make als sf_connection und die SQL-Abfrage als query:

fileset_sf = fileset.FileSet.make(
    target_stage_loc="@ML_DATASETS.public.my_models/",
    name="from_connector",
    sf_connection=sf_connection,
    query="SELECT * FROM MYDATA LIMIT 5000000",
    shuffle=True,           # see later section about shuffling
)
Copy

Bemerkung

Unter Umverteilen von Daten in FileSets finden Sie Informationen für das zufällige Anordnen Ihrer Daten mithilfe des Parameters shuffle.

Verwenden Sie die Methode files, um eine Liste der im FileSet enthaltenen Dateien zu erhalten:

print(*fileset_df.files())
Copy

Informationen zur Zuführung der Daten im FileSet zu PyTorch oder TensorFlow finden Sie unter Zuführen der Daten eines FileSet zu PyTorch bzw. Zuführen der Daten eines FileSet zu TensorFlow.

Zuführen der Daten eines FileSet zu PyTorch

Von einem Snowflake-FileSet erhalten Sie eine PyTorch-DataPipe, die an einen PyTorch-DataLoader übergegeben werden kann. Der DataLoader iteriert über die FileSet-Daten und liefert PyTorch-Tensoren in Batches. Erstellen Sie die DataPipe mit der FileSet-Methode to_torch_datapipe, und übergeben Sie dann DataPipe an den PyTorch-DataLoader:

from torch.utils.data import DataLoader

# See later sections about shuffling and batching
pipe = fileset_df.to_torch_datapipe(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in DataLoader(pipe, batch_size=None, num_workers=0):
    print(batch)
    break
Copy

Zuführen der Daten eines FileSet zu TensorFlow

Sie erhalten ein TensorFlow-Dataset von einem Snowflake-FileSet, indem Sie die FileSet-Methode to_tf_dataset verwenden:

import tensorflow as tf

# See following sections about shuffling and batching
ds = fileset_df.to_tf_dataset(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in ds:
    print(batch)
    break
Copy

Durch Iterieren über dem Dataset erhalten Sie Tensoren in Batches.

Umverteilen von Daten in FileSets

Es ist oft sinnvoll, die Trainingsdaten zufällig neu anzuordnen (Shuffling), um eine Überanpassung und andere Probleme zu vermeiden. Eine Diskussion über den Wert des Shufflings finden Sie unter Warum sollten Daten für maschinelles Lernen neu angeordnet werden? (auf Englisch)

Wenn die Daten nicht bereits durch Ihre Abfrage ausreichend umverteilt wurden, kann ein FileSet die Daten an zwei Punkten umverteilen:

  • Wenn das FileSet mit FileSet.make erstellt wird.

    Alle Zeilen in Ihrer Abfrage werden umverteilt, bevor sie in das FileSet geschrieben werden. Dies ist eine hochwertige globale Durchmischung, die bei großen Datensets teuer werden kann. Daher wird sie nur einmal ausgeführt, nämlich beim Materialisieren des FileSet. Übergeben Sie shuffle=True als Schlüsselwortargument an FileSet.make.

  • Wenn Sie aus einem FileSet eine PyTorch-DataPipe oder ein TensorFlow-Dataset erstellen.

    Zu diesem Zeitpunkt ist die Reihenfolge der Dateien im FileSet zufällig, ebenso wie die Reihenfolge der Zeilen in jeder Datei. Dies kann als eine „annährend“ globale Umverteilung betrachtet werden. Sie ist von geringerer Qualität als eine echte globale Umverteilung, aber sie ist deutlich preiswerter. Um das Shuffling an dieser Stelle auszuführen, übergeben Sie shuffle=True als Schlüsselwortargument an die FileSet-Methode to_torch_datapipe oder to_tf_dataset.

Die besten Ergebnisse erzielen Sie, wenn Sie das Shuffling zweimal ausführen: beim Erstellen des FileSet und beim Zuführen der Daten zu PyTorch bzw. TensorFlow.

Batchverarbeitung von Daten in FileSets

FileSets verfügen über ein Feature zur Batchverarbeitung, das genauso funktioniert wie die Batchverarbeitungsfunktionalität in PyTorch und TensorFlow, nur dass sie effizienter ist. Snowflake empfiehlt Ihnen, den Parameter batch_size in den FileSet-Methoden to_torch_datapipe und to_tf_dataset zu verwenden, anstatt die Batchverarbeitung von PyTorch oder TensorFlow ausführen zu lassen. Um die Batchverarbeitungsfunktionalität von PyTorch zu deaktivieren, müssen Sie bei der Instanziierung von DataLoader explizit batch_size=None übergeben.

Sie können den letzten Batch auch löschen, wenn dieser unvollständig ist, indem Sie drop_last_batch=True an to_torch_datapipe oder to_tf_dataset übergeben.