pandas on Snowflake

pandas on Snowflake vous permet d’exécuter votre code Pandas de manière distribuée directement sur vos données dans Snowflake. Il suffit de modifier l’instruction d’importation et quelques lignes de code pour obtenir une expérience pandas similaire à ce que vous connaissez et appréciez, avec les avantages d’évolutivité et de sécurité de Snowflake. Avec pandas on Snowflake, vous pouvez travailler avec des ensembles de données beaucoup plus importants et éviter le temps et les dépenses liés au portage de vos pipelines pandas vers d’autres frameworks big data ou à l’approvisionnement de machines volumineuses et coûteuses. Il exécute des charges de travail de manière native dans Snowflake via la transpilation vers SQL, lui permettant de profiter de la parallélisation et des avantages de gouvernance et de sécurité des données de Snowflake. pandas on Snowflake est fourni via l’API Snowpark pandas dans le cadre de la Bibliothèque Python Snowpark, qui permet un traitement évolutif des données du code Python au sein de la plateforme Snowflake.

Avantages de l’utilisation de pandas on Snowflake

  • Rencontrer les développeurs Python là où ils se trouvent – pandas on Snowflake offre une interface familière aux développeurs Python en fournissant une couche compatible avec pandas qui peut fonctionner nativement dans Snowflake.

  • pandas distribués évolutifs – pandas on Snowflake allie la commodité de pandas à l’évolutivité de Snowflake en exploitant les techniques d’optimisation des requêtes existantes dans Snowflake. Des réécritures de code minimales sont requises, ce qui simplifie le parcours de migration et vous permet de passer en toute transparence du prototype à la production.

  • Sécurité et gouvernance – Les données ne quittent pas la plateforme sécurisée de Snowflake. pandas on Snowflake permet une uniformité au sein des organisations de données sur la manière dont les données sont accessibles et permet un audit et une gouvernance plus faciles.

  • Aucune infrastructure de calcul supplémentaire à gérer et à ajuster – pandas on Snowflake exploite le moteur de calcul puissant de Snowflake, et vous n’avez pas besoin de configurer ou de gérer une infrastructure de calcul supplémentaire.

Quand dois-je utiliser pandas on Snowflake

Vous devez utiliser pandas on Snowflake si l’une des conditions suivantes est remplie :

  • Vous connaissez les API pandas et l’écosystème plus large PyData

  • Vous travaillez en équipe avec d’autres personnes qui connaissent pandas et qui souhaitent collaborer sur la même base de code

  • Vous avez du code existant écrit en pandas

  • Votre flux de travail a des besoins liés aux commandes, ce qui est pris en charge par les DataFrames pandas. Par exemple, vous avez besoin que l’ensemble de données soit dans le même ordre pour l’ensemble du flux de travail.

  • Vous préférez une complétion de code plus précise à partir d’outils de copilote basés sur AI

Premiers pas avec pandas on Snowflake

Pour installer pandas on Snowflake, vous pouvez utiliser conda ou pip pour installer le paquet. Pour des instructions détaillées, voir Installation.

pip install "snowflake-snowpark-python[modin]"
Copy

Une fois pandas on Snowflake installé, au lieu d’importer pandas en tant que import pandas as pd, utilisez les deux lignes suivantes :

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
Copy

Voici un exemple de la façon dont vous pouvez commencer à utiliser pandas on Snowflake via les pandas de la bibliothèque Snowpark Python avec Modin.

import modin.pandas as pd

# Import the Snowpark plugin for modin.
import snowflake.snowpark.modin.plugin

# Create a Snowpark session with a default connection.
from snowflake.snowpark.session import Session
session = Session.builder.create()

# Create a Snowpark pandas DataFrame from existing Snowflake table
df = pd.read_snowflake('SNOWFALL')

# Alternatively, create a Snowpark pandas DataFrame with sample data.
df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                    [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                    [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                    columns=["DAY", "LOCATION", "SNOWFALL"])

# Inspect the DataFrame
df
Copy
      DAY  LOCATION  SNOWFALL
0       1  Big Bear       8.0
1       2  Big Bear      10.0
2       3  Big Bear       NaN
3       1     Tahoe       3.0
4       2     Tahoe       NaN
5       3     Tahoe      13.0
6       1  Whistler       NaN
7  Friday  Whistler      40.0
8       3  Whistler      25.0
# In-place point update to fix data error.
df.loc[df["DAY"]=="Friday","DAY"]=2

# Inspect the columns after update.
# Note how the data type is updated automatically after transformation.
df["DAY"]
Copy
0    1
1    2
2    3
3    1
4    2
5    3
6    1
7    2
8    3
Name: DAY, dtype: int64
# Drop rows with null values.
df.dropna()
Copy
  DAY  LOCATION  SNOWFALL
0   1  Big Bear       8.0
1   2  Big Bear      10.0
3   1     Tahoe       3.0
5   3     Tahoe      13.0
7   2  Whistler      40.0
8   3  Whistler      25.0
# Compute the average daily snowfall across locations.
df.groupby("LOCATION").mean()["SNOWFALL"]
Copy
LOCATION
Big Bear     9.0
Tahoe        8.0
Whistler    32.5
Name: SNOWFALL, dtype: float64

Utilisation de pandas on Snowflake avec des DataFrames Snowpark

L’API pandas on Snowflake et l’API DataFrame sont hautement interopérables, vous pouvez donc créer un pipeline qui exploite ces deux APIs.

Vous pouvez utiliser les opérations suivantes pour effectuer des conversions entre des DataFrames Snowpark et des DataFrames Snowpark pandas :

Fonctionnement

Entrée

Sortie

Remarques

to_snowpark_pandas

DataFrame Snowpark

DataFrame Snowpark Pandas

Cette opération attribue un ordre implicite à chaque ligne et maintient cet ordre pendant toute la durée de vie du DataFrame. Des frais d’E/S seront engagés lors de cette conversion.

to_snowpark

DataFrame Snowpark pandas ou Snowpark pandas Series

DataFrame Snowpark

Cette opération ne maintient pas l’ordre des lignes, et le DataFrame pandas Snowpark résultant opère sur un instantané des données du DataFrame pandas Snowpark source. Contrairement aux DataFrames Snowpark créés directement à partir de la table, ce comportement signifie que les modifications apportées à la table sous-jacente ne seront pas répercutées lors de l’évaluation des opérations de Snowpark. Les opérations hors DDL et limitées aux DML peuvent être appliquées sur le DataFrame. Aucun coût d’E/S ne sera encouru lors de cette conversion.

Dans la mesure du possible, nous vous conseillons d’utiliser read_snowflake pour lire la table directement depuis Snowflake au lieu de la convertir vers et depuis un DataFrame Snowpark pour éviter des coûts de conversion inutiles.

Pour plus d’informations, voir DataFrames Snowpark contre DataFrame pandas Snowpark : lequel dois-je choisir ?.

Comparaison de pandas on Snowflake et de pandas natif

pandas on Snowflake et pandas natif ont des APIs DataFramesimilaires avec des signatures correspondantes et une sémantique similaire. pandas on Snowflake fournit la même signature d’API que pandas natif (pandas 2.2.1) et fournit un calcul évolutif avec Snowflake. pandas on Snowflake respecte autant que possible la sémantique décrite dans la documentation native de pandas, mais il utilise le système de calcul et de type Snowflake. Cependant, lorsque pandas natif s’exécute sur une machine cliente, il utilise le système de calcul et de type de Python. Pour obtenir des informations sur le mappage des types entre pandas on Snowflake et Snowflake, consultez Types de données.

Comme pandas natif, pandas on Snowflake a également la notion d’index et maintient l’ordre des lignes. Cependant, leurs environnements d’exécution distincts entraînent certaines différences nuancées dans leur comportement. Cette section présente les différences clés à prendre en compte.

pandas on Snowflake est mieux utilisé avec des données qui sont déjà dans Snowflake, mais vous pouvez utiliser les opérations suivantes pour effectuer la conversion entre pandas natif et pandas on Snowflake :

Fonctionnement

Entrée

Sortie

Remarques

to_pandas

DataFrame Snowpark Pandas

DataFrame pandas natif

Matérialisez toutes les données dans l’environnement local. Si l’ensemble de données est volumineux, il peut en résulter une erreur de mémoire insuffisante.

pd.DataFrame(…)

DataFrame pandas natif, données brutes, objet pandas Snowpark

DataFrame Snowpark Pandas

Elle devrait être réservée aux petits DataFrames. La création d’un DataFrame contenant de grandes quantités de données locales peut créer une table temporaire et peut entraîner des problèmes de performance dus au téléchargement des données.

session.write_pandas

DataFrame pandas natif, objet pandas Snowpark

Table Snowflake

Le résultat peut ensuite être chargé dans pandas on Snowflake avec pd.read_snowflake en utilisant le nom de la table spécifié dans l’appel de write_pandas.

Environnement d’exécution

  • pandas : fonctionne sur une seule machine et traite les données en mémoire.

  • pandas on Snowflake : s’intègre à Snowflake, qui permet un calcul distribué sur un clustering de machines. Cette intégration permet de traiter des ensembles de données beaucoup plus importants qui dépassent la capacité de mémoire d’une seule machine. Notez que l’utilisation d’API pandas Snowpark nécessite une connexion à Snowflake.

Évaluation paresseuse ou exigeante

  • pandas : exécute les opérations immédiatement et matérialise les résultats intégralement dans la mémoire après chaque opération. Cette évaluation rapide des opérations peut entraîner une pression accrue sur la mémoire, car les données doivent être déplacées de manière intensive au sein d’une machine.

  • pandas on Snowflake : fournit la même expérience d’API que pandas. Cela imite le modèle d’évaluation exigeante de pandas, mais construit en interne un graphe de requête à évaluation paresseuse pour permettre l’optimisation entre les opérations.

    La fusion et la transposition des opérations dans un graphe de requêtes offrent des possibilités d’optimisation supplémentaires pour le moteur de calcul distribué Snowflake sous-jacent, ce qui réduit à la fois le coût et la durée d’exécution du pipeline de bout en bout par rapport à l’exécution de pandas directement dans Snowflake.

    Note

    Les E/S liées aux APIs et les APIs dont la valeur de retour n’est pas un objet pandas Snowpark (c’est-à-dire DataFrame, Series ou Index) s’évaluent toujours avec exigence. Par exemple :

    • read_snowflake

    • to_snowflake

    • to_pandas

    • to_dict

    • to_list

    • __repr__

    • La méthode dunder, __array__ qui peut être appelée automatiquement par certaines bibliothèques tierces telles que scikit-learn. Les appels à cette méthode matérialiseront les résultats sur la machine locale.

Source et stockage des données

  • pandas : prend en charge les différents lecteurs et rédacteurs listés dans la documentation de pandas dans les outils IO (texte, CSV, HDF5, …).

  • pandas on Snowflake : peut lire et écrire à partir de tables Snowflake et lire des fichiers CSV, JSON ou parquet locaux ou mis en zone de préparation. Pour plus d’informations, voir IO (lecture et écriture).

Types de données

  • pandas : possède un riche ensemble de types de données, tels que les entiers, les flottants, les chaînes, les types datetime et les types catégoriels. Prend également en charge les types de données définis par l’utilisateur. Les types de données dans pandas sont généralement dérivés des données sous-jacentes et sont appliqués de manière stricte.

  • pandas on Snowflake : contrainte par le système de type Snowflake, qui mappe les objets pandas à SQL en traduisant les types de données pandas en types SQL dans Snowflake. Une majorité de types de pandas ont un équivalent naturel dans Snowflake, mais le mappage n’est pas toujours univoque. Dans certains cas, plusieurs types pandas sont mappés vers le même type SQL.

Le tableau suivant liste les mappages de types entre pandas et Snowflake SQL :

type pandas

Type Snowflake

Tous les types d’entiers signés/non signés, y compris les types d’entiers étendus pandas

NUMBER(38, 0)

Tous les types de flottants, y compris les types de données flottants étendus de pandas

FLOAT

bool, BooleanDtype

BOOLEAN

str, StringDtype

STRING

datetime.time

TIME

datetime.date

DATE

Tous les types de datetime n’ayant pas de fuseau horaire

TIMESTAMP_NTZ

Tous les types de datetime tenant compte des fuseaux horaires

TIMESTAMP_TZ

list, tuple, array

ARRAY

dict, json

MAP

Colonne d’objets avec des types de données mixtes

VARIANT

Timedelta64[ns]

NUMBER(38, 0)

Note

Les types de données suivants ne sont pas pris en charge : données catégorielles, données de période, données d’intervalle, données éparses et données définies par l’utilisateur. Timedelta n’est actuellement pris en charge que sur le client pandas de Snowpark. Lors de la réécriture de Timedelta dans Snowflake, il sera stocké sous forme de type Nombre.

Le tableau suivant fournit le mappage des types SQL de Snowflake vers les types de pandas on Snowflake à l’aide de df.dtypes :

Type Snowflake

type pandas on Snowflake (df.dtypes)

NUMBER (scale = 0)

int64

NUMBER (scale > 0), REAL

float64

BOOLEAN

bool

STRING, TEXT

object (str)

VARIANT, BINARY, GEOMETRY, GEOGRAPHY

object

ARRAY

object (list)

OBJECT

object (dict)

TIME

object (datetime.time)

TIMESTAMP, TIMESTAMP_NTZ, TIMESTAMP_LTZ, TIMESTAMP_TZ

datetime64[ns]

DATE

object (datetime.date)

Lors de la conversion du DataFrame pandas Snowpark en DataFrame pandas natif avec to_pandas(), le DataFrame pandas natif aura des types de données affinés par rapport aux types de pandas on Snowflake, qui sont compatibles avec Mappages des types de données SQL-Python pour les fonctions et les procédures.

Casting et inférence de type

  • pandas : s’appuie sur NumPy et suit par défaut le système de type NumPy et Python pour le casting implicite des types et l’inférence. Par exemple, il traite les booléens comme des types entiers, de sorte que 1 + True renvoie 2.

  • pandas on Snowflake : mappe NumPy et les types Python en types Snowflake selon le tableau précédent, et utilise le système de types Snowflake sous-jacent pour le casting implicite des types et l’inférence. Par exemple, conformément à Types de données logiques, il ne convertit pas implicitement les booléens en types entiers, de sorte que 1 + True entraîne une erreur de conversion de type.

Traitement des valeurs nulles

  • pandas : dans les versions 1.x de pandas, pandas était flexible lorsqu’il traitait les données manquantes, il traitait donc toutes les valeurs manquantes Python None, np.nan, pd.NaN, pd.NA, et pd.NaT comme des valeurs manquantes. Dans les versions ultérieures de pandas (2.2.x), ces valeurs sont traitées comme des valeurs différentes.

  • pandas on Snowflake : adopte une approche similaire aux versions antérieures de pandas qui traite toutes les valeurs précédentes de la liste comme des valeurs manquantes. Snowpark réutilise NaN, NA, et NaT de pandas. Mais notez que toutes ces valeurs manquantes sont traitées de manière interchangeable et stockées en tant que valeurs SQL NULL dans la table Snowflake.

Alias de décalage/fréquence

  • pandas : les décalages de date dans pandas ont été modifiés dans la version 2.2.1. Les alias à une lettre 'M', 'Q', 'Y', et autres ont été abandonnés au profit de décalages à deux lettres.

  • pandas on Snowflake : utilise exclusivement les nouveaux décalages décrits dans la documentation sur les séries temporelles pandas.

Installation

Prérequis : Python 3.9, 3.10 ou 3.11, modin version 0.28.1, et pandas version 2.2.1 sont nécessaires.

Astuce

Pour utiliser pandas on Snowflake dans Snowflake Notebooks, voir les instructions d’installation dans Pandas on Snowflake dans des notebooks.

Pour installer pandas on Snowflake dans votre environnement de développement, suivez ces étapes :

  1. Allez dans le répertoire de votre projet et activez votre environnement virtuel Python.

    Note

    L’API étant en cours de développement, nous vous recommandons de l’installer dans un environnement virtuel Python plutôt que sur l’ensemble du système. Cette pratique permet à chaque projet que vous créez d’utiliser une version spécifique, ce qui vous protège des modifications apportées aux versions ultérieures.

    Vous pouvez créer un environnement virtuel Python pour une version particulière de Python à l’aide d’outils tels que Anaconda, Miniconda, ou virtualenv.

    Par exemple, pour utiliser conda afin de créer un environnement virtuel Python 3.9, tapez :

    conda create --name snowpark_pandas python=3.9
    conda activate snowpark_pandas
    
    Copy

    Note

    Si vous avez précédemment installé une version plus ancienne de pandas on Snowflake en utilisant Python 3.8 et pandas 1.5.3, vous devrez mettre à niveau vos versions de Python et de pandas comme décrit ci-dessus. Suivez ces étapes pour créer un nouvel environnement avec Python 3.9, 3.10 ou 3.11.

  2. Installez la bibliothèque Snowpark Python avec Modin.

    pip install "snowflake-snowpark-python[modin]"
    
    Copy

    ou

    conda install snowflake-snowpark-python modin==0.28.1
    
    Copy

Note

Assurez-vous que la version 1.17.0 ou ultérieure de snowflake-snowpark-python est installée.

Authentification à Snowflake

Avant d’utiliser pandas on Snowflake, vous devez établir une session avec la base de données Snowflake. Vous pouvez utiliser un fichier de configuration pour choisir les paramètres de connexion de votre session ou les énumérer dans votre code. Pour plus d’informations, consultez Création d’une session pour Snowpark Python. S’il existe une session active unique de Snowpark Python, pandas on Snowflake l’utilisera automatiquement. Par exemple :

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

# pandas on Snowflake will automatically pick up the Snowpark session created above.
# It will use that session to create new DataFrames.
df = pd.DataFrame([1, 2])
df2 = pd.read_snowflake('CUSTOMER')
Copy

pd.session est une session de Snowpark, vous pouvez donc faire tout ce que vous pouvez faire avec n’importe quelle autre session de Snowpark. Par exemple, vous pouvez l’utiliser pour exécuter une requête SQL arbitraire, qui produit un DataFrame Snowpark conformément à l’API de session, mais notez que le résultat sera un DataFrame Snowpark, et non un DataFrame Snowpark pandas.

# pd.session is the session that pandas on Snowflake is using for new DataFrames.
# In this case it is the same as the Snowpark session that we've created.
assert pd.session is session

# Run SQL query with returned result as Snowpark DataFrame
snowpark_df = pd.session.sql('select * from customer')
snowpark_df.show()
Copy

Vous pouvez également configurer vos paramètres de connexion Snowpark dans un fichier de configuration. Cela élimine le besoin d’énumérer les paramètres de connexion dans votre code, ce qui vous permet d’écrire votre code pandas on Snowflake presque comme vous écririez normalement du code pandas. Pour ce faire, créez un fichier de configuration à l’adresse ~/.snowflake/connections.toml qui ressemble à ceci :

default_connection_name = "default"

[default]
account = "<myaccount>"
user = "<myuser>"
password = "<mypassword>"
role="<myrole>"
database = "<mydatabase>"
schema = "<myschema>"
warehouse = "<mywarehouse>"
Copy

Ensuite, dans le code, il vous suffit d’utiliser snowflake.snowpark.Session.builder.create() pour créer une session à l’aide de ces identifiants de connexion.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

# Session.builder.create() will create a default Snowflake connection.
Session.builder.create()
# create a DataFrame.
df = pd.DataFrame([[1, 2], [3, 4]])
Copy

Vous pouvez également créer plusieurs sessions Snowpark, puis en attribuer une à pandas on Snowflake. pandas on Snowflake n’utilise qu’une seule session, vous devez donc attribuer explicitement l’une des sessions à pandas on Snowflake avec pd.session = pandas_session.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>").create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>").create()
pd.session = pandas_session
df = pd.DataFrame([1, 2, 3])
Copy

L’exemple suivant montre qu’en essayant d’utiliser pandas on Snowflake alors qu’il n’y a pas de session Snowpark active, vous obtiendrez SnowparkSessionException avec une erreur du type « pandas on Snowflake requiert une session snowpark active, mais il n’y en a aucune. » Une fois que vous avez créé une session, vous pouvez utiliser pandas on Snowflake. Par exemple :

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

df = pd.DataFrame([1, 2, 3])
Copy

L’exemple suivant montre qu’en essayant d’utiliser pandas on Snowflake lorsqu’il y a plusieurs sessions Snowpark actives, vous obtiendrez SnowparkSessionException avec un message du type « Il y a plusieurs sessions snowpark actives, mais vous devez en choisir une pour pandas on Snowflake. »

import modin.pandas as pd
import snowflake.snowpark.modin.plugin
from snowflake.snowpark import Session

pandas_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account1>"}).create()
other_session = Session.builder.configs({"user": "<user>", "password": "<password>", "account": "<account2>"}).create()
df = pd.DataFrame([1, 2, 3])
Copy

Note

Vous devez définir la session utilisée pour un nouveau DataFrame Snowpark pandas ou Series via modin.pandas.session. Cependant, la jointure ou la fusion de DataFrames créés avec des sessions différentes n’est pas prise en charge, vous devez donc éviter de définir plusieurs fois des sessions différentes et de créer des DataFrames avec des sessions différentes dans un flux de travail.

Référence API

Voir la référence d’API pandas on Snowflake pour une liste complète des APIs et des méthodes mises en œuvre actuellement disponibles.

Pour une liste complète des opérations prises en charge, consultez les tableaux suivants dans la référence de pandas on Snowflake :

Utiliser les pandas on Snowflake avec les Notebooks Snowflake

Pour utiliser pandas on Snowflake dans les Notebooks Snowflake, voir pandas on Snowflake dans les Notebooks.

Utilisation de pandas on Snowflake dans les procédures stockées

Vous pouvez utiliser pandas on Snowflake dans une procédure stockée pour créer un pipeline de données et planifier l’exécution de la procédure stockée avec des tâches.

from snowflake.snowpark.context import get_active_session
session = get_active_session()

from snowflake.snowpark import Session

def data_transformation_pipeline(session: Session) -> str:
  import modin.pandas as pd
  import snowflake.snowpark.modin.plugin
  from datetime import datetime
  # Create a Snowpark pandas DataFrame with sample data.
  df = pd.DataFrame([[1, 'Big Bear', 8],[2, 'Big Bear', 10],[3, 'Big Bear', None],
                     [1, 'Tahoe', 3],[2, 'Tahoe', None],[3, 'Tahoe', 13],
                     [1, 'Whistler', None],['Friday', 'Whistler', 40],[3, 'Whistler', 25]],
                      columns=["DAY", "LOCATION", "SNOWFALL"])
  # Drop rows with null values.
  df = df.dropna()
  # In-place point update to fix data error.
  df.loc[df["DAY"]=="Friday","DAY"]=2
  # Save Results as a Snowflake Table
  timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M")
  save_path = f"OUTPUT_{timestamp}"
  df.to_snowflake(name=save_path, if_exists="replace", index=False)
  return f'Transformed DataFrame saved to {save_path}.'


  dt_pipeline_sproc = session.sproc.register(name="run_data_transformation_pipeline_sp",
                             func=data_transformation_pipeline,
                             replace=True,
                             packages=['modin', 'snowflake-snowpark-python'])
Copy

Pour appeler la procédure stockée, vous pouvez exécuter dt_pipeline_sproc() en Python ou CALL run_data_transformation_pipeline_sp() en SQL.

Pour planifier la procédure stockée en tant que tâche, vous pouvez utiliser l”API Snowflake Python pour créer une tâche.

Utilisation de pandas on Snowflake avec des bibliothèques tierces

Lors de l’appel d’APIs de bibliothèque tierce avec un DataFrame pandas Snowpark, nous recommandons de convertir le DataFrame Snowpark pandas en DataFrame pandas en appelant to_pandas() avant de passer le DataFrame dans l’appel de la bibliothèque tierce.

Note

L’appel de to_pandas() extrait vos données de Snowflake et les place en mémoire, alors gardez cela à l’esprit pour les grands ensembles de données et les cas d’utilisation sensibles.

pandas on Snowflake a actuellement une compatibilité limitée pour certaines APIs NumPy et Matplotlib, comme une implémentation distribuée pour np.where et l’interopérabilité avec df.plot. En convertissant des DataFrames pandas Snowpark via to_pandas() lorsque vous travaillez avec ces bibliothèques tierces, vous éviterez plusieurs appels d’E/S.

Voici un exemple avec Altaïr pour la visualisation et scikit-learn pour le machine learning.

# Create a Snowpark session with a default connection.
session = Session.builder.create()

train = pd.read_snowflake('TITANIC')

train[['Pclass', 'Parch', 'Sex', 'Survived']].head()
Copy
    Pclass  Parch     Sex       Survived
0       3      0     male               0
1       1      0   female               1
2       3      0   female               1
3       1      0   female               1
4       3      0     male               0
import altair as alt
# Convert to pandas DataFrame
train_df_pandas = train.to_pandas()
survived_per_age_plot = alt.Chart(train_df_pandas).mark_bar(
).encode(
    x=alt.X('Age', bin=alt.Bin(maxbins=25)),
    y='count()',
    column='Survived:N',
    color='Survived:N',
).properties(
    width=300,
    height=300
).configure_axis(
    grid=False
)
survived_per_age_plot
Copy
altair

Nous pouvons désormais utiliser scikit-learn pour entraîner un modèle simple après la conversion en pandas.

feature_cols = ['Pclass', 'Parch']
# Convert features DataFrame to pandas DataFrames
X_pandas = train_snowpark_pandas.loc[:, feature_cols].to_pandas()
# Convert labels Series to pandas Series
y_pandas = train_snowpark_pandas["Survived"].to_pandas()

from sklearn.linear_model import LogisticRegression

logreg = LogisticRegression()

logreg.fit(X_pandas, y_pandas)

y_pred_pandas = logreg.predict(X_pandas)

acc_eval = accuracy_score(y_pandas, y_pred_pandas)
Copy
modèle scikit

Limitations

pandas on Snowflake présente les limitations suivantes :

  • pandas on Snowflake n’offre aucune garantie de compatibilité avec les bibliothèques tierces OSS. À partir de la version 1.14.0a1, cependant, Snowpark pandas introduit une compatibilité limitée pour NumPy, en particulier pour l’utilisation de np.where. Pour plus d’informations, voir Intéropérabilité de NumPy.

    Lorsque vous appelez des APIs de la bibliothèque tierce avec un DataFrame Snowpark pandas, Snowflake vous recommande de convertir le DataFrame Snowpark pandas en DataFrame pandas en appelant to_pandas() avant de transmettre le DataFrame vers l’appel de la bibliothèque tierce. Pour plus d’informations, voir Utilisation de pandas on Snowflake avec des bibliothèques tierces.

  • pandas on Snowflake n’est pas intégré à Snowpark ML. Lorsque vous utilisez Snowpark ML, nous vous recommandons de convertir le DataFrame pandas de Snowpark en un DataFrame Snowpark à l’aide de to_snowpark() avant d’appeler Snowpark ML.

  • Les objets MultiIndex paresseux ne sont pas pris en charge. Lorsque MultiIndex est utilisé, il renvoie un objet MultiIndex pandas natif, ce qui nécessite d’extraire toutes les données du côté client.

  • Toutes les APIs pandas n’ont pas encore d’implémentation distribuée dans pandas on Snowflake. Pour les APIs non prises en charge, NotImplementedError est levé. Les opérations qui n’ont pas d’implémentation distribuée reviennent à une procédure stockée. Pour obtenir des informations sur la prise en charge d’APIs, reportez-vous à la documentation de référence de l’API.

  • pandas on Snowflake nécessite une version spécifique de pandas. pandas on Snowflake nécessite pandas 2.2.1 et fournit uniquement une compatibilité avec pandas 2.2.1.

  • pandas on Snowflake ne peut pas être référencé dans la fonction apply() pandas on Snowflake. Vous ne pouvez utiliser que des pandas natifs dans apply().

Résolution des problèmes

Cette section décrit des conseils de dépannage lors de l’utilisation de pandas on Snowflake.

  • Lors du dépannage, essayez d’exécuter la même opération sur un DataFrame pandas natif (ou un échantillon) pour voir si la même erreur persiste. Cette approche pourrait fournir des indications sur la manière de résoudre votre requête. Par exemple :

    df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
    # Running this in Snowpark pandas throws an error
    df["A"].sum()
    # Convert a small sample of 10 rows to pandas DataFrame for testing
    pandas_df = df.head(10).to_pandas()
    # Run the same operation. KeyError indicates that the column reference is incorrect
    pandas_df["A"].sum()
    # Fix the column reference to get the Snowpark pandas query working
    df["a"].sum()
    
    Copy
  • Si vous avez un notebook ouvert depuis longtemps, notez que, par défaut, les sessions Snowflake se terminent au bout de 240 minutes (4 heures) d’inactivité. Lorsque la session expire, vous obtiendrez l’erreur suivante si vous exécutez d’autres requêtes pandas on Snowflake : « Le jeton d’authentificateur a expiré. L’utilisateur doit se réauthentifier. » À ce stade, vous devez rétablir à nouveau la connexion avec Snowflake. Cela peut entraîner la perte de toute variable de session non persistante. Pour plus d’informations sur la configuration du paramètre de délai d’inactivité de la session, voir Politiques de la session.

Meilleures pratiques

Cette section décrit les meilleures pratiques à suivre lors de l’utilisation de pandas on Snowflake.

  • Évitez d’utiliser des modèles de code itératifs, tels que les boucles for, iterrows et iteritems. Les modèles de code itératifs augmentent rapidement la complexité de la requête générée. Laissez pandas on Snowflake effectuer la distribution des données et la parallélisation des calculs plutôt que le code client. Lorsqu’il s’agit de modèles de code itératifs, essayez de rechercher des opérations qui peuvent être effectuées sur l’ensemble du DataFrame et utilisez les opérations correspondantes à la place.

for i in np.arange(0, 50):
  if i % 2 == 0:
    data = pd.concat([data, pd.DataFrame({'A': i, 'B': i + 1}, index=[0])], ignore_index=True)
  else:
    data = pd.concat([data, pd.DataFrame({'A': i}, index=[0])], ignore_index=True)

# Instead of creating one DataFrame per row and concatenating them,
# try to directly create the DataFrame out of the data, like this:

data = pd.DataFrame(
      {
          "A": range(0, 50),
          "B": [i + 1 if i % 2 == 0 else None for i in range(50)],
      },
)
Copy
  • Évitez d’appeler apply, applymap et transform, qui sont finalement mis en œuvre avec des UDFs ou des UDTFs, qui peuvent ne pas être aussi performantes que les requêtes SQL habituelles. Si la fonction appliquée possède une opération équivalente sur un DataFrame ou des séries, utilisez cette opération à la place. Par exemple, au lieu de df.groupby('col1').apply('sum'), appelez directement df.groupby('col1').sum().

  • Appelez to_pandas() avant de passer le DataFrame ou une série vers un appel de bibliothèque tierce. pandas on Snowflake ne fournit pas de garantie de compatibilité avec les bibliothèques tierces.

  • Utilisez une table Snowflake matérialisée régulière pour éviter une surcharge d’E/S supplémentaire. pandas on Snowflake fonctionne sur un instantané de données qui ne fonctionne que pour les tables régulières. Pour les autres types, y compris les tables externes, les vues et les tables Apache Iceberg™, une table temporaire est créée avant de prendre l’instantané, ce qui entraîne une surcharge de matérialisation.

  • pandas on Snowflake fournit une capacité de clonage rapide et sans copie lors de la création de DataFrames à partir de tables de Snowflake avec read_snowflake. Cependant, la capacité d’instantané n’est fournie que pour les tables Snowflake standard dans les bases de données normales. Une matérialisation supplémentaire des tables Snowflake classiques sera introduite lors du chargement de tables avec des types tels que hybride, iceberg, etc., ou des tables sous des bases de données partagées. L’instantané est nécessaire pour assurer la cohérence des données et garantir le classement, et il n’existe actuellement aucun autre moyen de contourner la matérialisation supplémentaire. Veuillez essayer d’utiliser autant que possible des tables Snowflake normales lorsque vous utilisez pandas on Snowflake.

  • Vérifiez deux fois le type de résultat avant de procéder à d’autres opérations, et effectuez un casting de type explicite à l’aide de astype si nécessaire.

    En raison de la capacité limitée d’inférence de type, si aucune indication de type n’est donnée, df.apply renverra des résultats de type objet (variante) même si le résultat contient toutes les valeurs entières. Si d’autres opérations nécessitent que la colonne dtype soit int, vous pouvez procéder à un casting de type explicite en appelant la méthode astype pour corriger le type de la colonne avant de continuer.

  • Évitez d’appeler des APIs qui nécessitent une évaluation et une matérialisation si ce n’est pas nécessaire.

    Les APIs qui ne renvoient pas Series ou Dataframe nécessitent une évaluation exigeante et une matérialisation pour produire le résultat dans le bon type. Il en va de même pour les méthodes de traçage. Réduisez les appels vers les APIs afin de réduire les évaluations et la matérialisation inutiles.

  • Évitez d’appeler np.where(<cond>, <scalar>, n) sur les grands ensembles de données. <scalar > sera diffusé à un DataFrame de la taille de <cond>, ce qui peut être lent.

  • Lorsque vous travaillez avec des requêtes construites de manière itérative, df.cache_result peut être utilisé pour matérialiser des résultats intermédiaires afin de réduire l’évaluation répétée et d’améliorer la latence et de réduire la complexité de la requête globale. Par exemple :

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

    Dans l’exemple ci-dessus, la requête permettant de produire df2 est coûteuse à calculer et est réutilisée dans la création de df3 et de df4. La matérialisation de df2 dans une table temporaire (en faisant des opérations ultérieures impliquant df2 un balayage de table au lieu d’un pivot) peut réduire la latence globale du bloc de code :

    df = pd.read_snowflake('pandas_test')
    df2 = pd.pivot_table(df, index='index_col', columns='pivot_col') # expensive operation
    df2.cache_result(inplace=True)
    df3 = df.merge(df2)
    df4 = df3.where(df2 == True)
    
    Copy

Exemples

Voici un exemple de code avec des opérations pandas. Nous commençons par un DataFrame pandas Snowpark nommé pandas_test, qui contient trois colonnes : COL_STR, COL_FLOAT, et COL_INT. Pour voir le notebook associé à ces exemples, consultez les exemples de pandas on Snowflake dans le référentiel Snowflake-Labs.

import modin.pandas as pd
import snowflake.snowpark.modin.plugin

from snowflake.snowpark import Session

CONNECTION_PARAMETERS = {
    'account': '<myaccount>',
    'user': '<myuser>',
    'password': '<mypassword>',
    'role': '<myrole>',
    'database': '<mydatabase>',
    'schema': '<myschema>',
    'warehouse': '<mywarehouse>',
}
session = Session.builder.configs(CONNECTION_PARAMETERS).create()

df = pd.DataFrame([['a', 2.1, 1],['b', 4.2, 2],['c', 6.3, None]], columns=["COL_STR", "COL_FLOAT", "COL_INT"])

df
Copy
  COL_STR    COL_FLOAT    COL_INT
0       a          2.1        1.0
1       b          4.2        2.0
2       c          6.3        NaN

Nous enregistrons le DataFrame sous la forme d’une table Snowflake nommée pandas_test que nous utiliserons tout au long de nos exemples.

df.to_snowflake("pandas_test", if_exists='replace',index=False)
Copy

Ensuite, nous créons un DataFrame à partir de la table Snowflake. Nous supprimons la colonne COL_INT et sauvegardons le résultat dans Snowflake avec une colonne nommée row_position.

# Create a DataFrame out of a Snowflake table.
df = pd.read_snowflake('pandas_test')

df.shape
Copy
(3, 3)
df.head(2)
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         b        4.2        2
df.dropna(subset=["COL_FLOAT"], inplace=True)

df
Copy
    COL_STR  COL_FLOAT  COL_INT
0         a        2.1        1
1         c        6.3        2
df.shape
Copy
(2, 3)
df.dtypes
Copy
COL_STR       object
COL_FLOAT    float64
COL_INT        int64
dtype: object
# Save the result back to Snowflake with a row_pos column.
df.reset_index(drop=True).to_snowflake('pandas_test2', if_exists='replace', index=True, index_label=['row_pos'])
Copy

Vous obtenez une nouvelle table, pandas_test2, qui se présente comme suit :

     row_pos  COL_STR  COL_FLOAT  COL_INT
0          1         a       2.0        1
1          2         b       4.0        2

IO (lecture et écriture)

# Reading and writing to Snowflake
df = pd.DataFrame({"fruit": ["apple", "orange"], "size": [3.4, 5.4], "weight": [1.4, 3.2]})
df.to_snowflake("test_table", if_exists="replace", index=False )

df_table = pd.read_snowflake("test_table")


# Generate sample CSV file
with open("data.csv", "w") as f:
    f.write('fruit,size,weight\napple,3.4,1.4\norange,5.4,3.2')
# Read from local CSV file
df_csv = pd.read_csv("data.csv")

# Generate sample JSON file
with open("data.json", "w") as f:
    f.write('{"fruit":"apple", "size":3.4, "weight":1.4},{"fruit":"orange", "size":5.4, "weight":3.2}')
# Read from local JSON file
df_json = pd.read_json('data.json')

# Upload data.json and data.csv to Snowflake stage named @TEST_STAGE
# Read CSV and JSON file from stage
df_csv = pd.read_csv('@TEST_STAGE/data.csv')
df_json = pd.read_json('@TEST_STAGE/data.json')
Copy

Pour plus d’informations, voir Entrée/sortie.

Indexation

df = pd.DataFrame({"a": [1,2,3], "b": ["x", "y", "z"]})
df.columns
Copy
Index(['a', 'b'], dtype='object')
df.index
Copy
Index([0, 1, 2], dtype='int8')
df["a"]
Copy
0    1
1    2
2    3
Name: a, dtype: int8
df["b"]
Copy
0    x
1    y
2    z
Name: b, dtype: object
df.iloc[0,1]
Copy
'x'
df.loc[df["a"] > 2]
Copy
a  b
2  3  z
df.columns = ["c", "d"]
df
Copy
     c  d
0    1  x
1    2  y
2    3  z
df = df.set_index("c")
df
Copy
   d
c
1  x
2  y
3  z
df.rename(columns={"d": "renamed"})
Copy
    renamed
c
1       x
2       y
3       z

Valeurs manquantes

import numpy as np
df = pd.DataFrame([[np.nan, 2, np.nan, 0],
                [3, 4, np.nan, 1],
                [np.nan, np.nan, np.nan, np.nan],
                [np.nan, 3, np.nan, 4]],
                columns=list("ABCD"))
df
Copy
     A    B   C    D
0  NaN  2.0 NaN  0.0
1  3.0  4.0 NaN  1.0
2  NaN  NaN NaN  NaN
3  NaN  3.0 NaN  4.0
df.isna()
Copy
       A      B     C      D
0   True  False  True  False
1  False  False  True  False
2   True   True  True   True
3   True  False  True  False
df.fillna(0)
Copy
     A    B    C    D
0   0.0  2.0  0.0  0.0
1   3.0  4.0  0.0  1.0
2   0.0  0.0  0.0  0.0
3   0.0  3.0  0.0  4.0
df.dropna(how="all")
Copy
     A    B   C    D
0   NaN  2.0 NaN  0.0
1   3.0  4.0 NaN  1.0
3   NaN  3.0 NaN  4.0

Type de conversion

df = pd.DataFrame({"int": [1,2,3], "str": ["4", "5", "6"]})
df
Copy
   int str
0    1   4
1    2   5
2    3   6
df_float = df.astype(float)
df_float
Copy
   int  str
0  1.0  4.0
1  2.0  5.0
2  3.0  6.0
df_float.dtypes
Copy
int    float64
str    float64
dtype: object
pd.to_numeric(df.str)
Copy
0    4.0
1    5.0
2    6.0
Name: str, dtype: float64
df = pd.DataFrame({'year': [2015, 2016],
                'month': [2, 3],
                'day': [4, 5]})
pd.to_datetime(df)
Copy
0   2015-02-04
1   2016-03-05
dtype: datetime64[ns]

Opérations binaires

df_1 = pd.DataFrame([[1,2,3],[4,5,6]])
df_2 = pd.DataFrame([[6,7,8]])
df_1.add(df_2)
Copy
    0    1     2
0  7.0  9.0  11.0
1  NaN  NaN   NaN
s1 = pd.Series([1, 2, 3])
s2 = pd.Series([2, 2, 2])
s1 + s2
Copy
0    3
1    4
2    5
dtype: int64
df = pd.DataFrame({"A": [1,2,3], "B": [4,5,6]})
df["A+B"] = df["A"] + df["B"]
df
Copy
   A  B  A+B
0  1  4    5
1  2  5    7
2  3  6    9

Agrégation

df = pd.DataFrame([[1, 2, 3],
                [4, 5, 6],
                [7, 8, 9],
                [np.nan, np.nan, np.nan]],
                columns=['A', 'B', 'C'])
df.agg(['sum', 'min'])
Copy
        A     B     C
sum  12.0  15.0  18.0
min   1.0   2.0   3.0
df.median()
Copy
A    4.0
B    5.0
C    6.0
dtype: float64

Fusionner

df1 = pd.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [1, 2, 3, 5]})
df1
Copy
  lkey  value
0  foo      1
1  bar      2
2  baz      3
3  foo      5
df2 = pd.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [5, 6, 7, 8]})
df2
Copy
  rkey  value
0  foo      5
1  bar      6
2  baz      7
3  foo      8
df1.merge(df2, left_on='lkey', right_on='rkey')
Copy
  lkey  value_x rkey  value_y
0  foo        1  foo        5
1  foo        1  foo        8
2  bar        2  bar        6
3  baz        3  baz        7
4  foo        5  foo        5
5  foo        5  foo        8
df = pd.DataFrame({'key': ['K0', 'K1', 'K2', 'K3', 'K4', 'K5'],
                'A': ['A0', 'A1', 'A2', 'A3', 'A4', 'A5']})
df
Copy
  key   A
0  K0  A0
1  K1  A1
2  K2  A2
3  K3  A3
4  K4  A4
5  K5  A5
other = pd.DataFrame({'key': ['K0', 'K1', 'K2'],
                    'B': ['B0', 'B1', 'B2']})
df.join(other, lsuffix='_caller', rsuffix='_other')
Copy
  key_caller   A key_other     B
0         K0  A0        K0    B0
1         K1  A1        K1    B1
2         K2  A2        K2    B2
3         K3  A3      None  None
4         K4  A4      None  None
5         K5  A5      None  None

Groupby

df = pd.DataFrame({'Animal': ['Falcon', 'Falcon','Parrot', 'Parrot'],
               'Max Speed': [380., 370., 24., 26.]})

df
Copy
   Animal  Max Speed
0  Falcon      380.0
1  Falcon      370.0
2  Parrot       24.0
3  Parrot       26.0
df.groupby(['Animal']).mean()
Copy
        Max Speed
Animal
Falcon      375.0
Parrot       25.0

Pour plus d’informations, voir GroupBy.

Pivot

df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo",
                        "bar", "bar", "bar", "bar"],
                "B": ["one", "one", "one", "two", "two",
                        "one", "one", "two", "two"],
                "C": ["small", "large", "large", "small",
                        "small", "large", "small", "small",
                        "large"],
                "D": [1, 2, 2, 3, 3, 4, 5, 6, 7],
                "E": [2, 4, 5, 5, 6, 6, 8, 9, 9]})
df
Copy
     A    B      C  D  E
0  foo  one  small  1  2
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  5
4  foo  two  small  3  6
5  bar  one  large  4  6
6  bar  one  small  5  8
7  bar  two  small  6  9
8  bar  two  large  7  9
pd.pivot_table(df, values='D', index=['A', 'B'],
                   columns=['C'], aggfunc="sum")
Copy
    C    large  small
A   B
bar one    4.0      5
    two    7.0      6
foo one    4.0      1
    two    NaN      6
df = pd.DataFrame({'foo': ['one', 'one', 'one', 'two', 'two', 'two'],
                'bar': ['A', 'B', 'C', 'A', 'B', 'C'],
                'baz': [1, 2, 3, 4, 5, 6],
                'zoo': ['x', 'y', 'z', 'q', 'w', 't']})
df
Copy
   foo bar  baz zoo
0  one   A    1   x
1  one   B    2   y
2  one   C    3   z
3  two   A    4   q
4  two   B    5   w
5  two   C    6   t

Ressources