You are viewing documentation about an older version (1.15.0). View latest version

snowflake.snowpark.DataFrameAnalyticsFunctions.moving_agg¶

DataFrameAnalyticsFunctions.moving_agg(aggs: ~typing.Dict[str, ~typing.List[str]], window_sizes: ~typing.List[int], order_by: ~typing.List[str], group_by: ~typing.List[str], col_formatter: ~typing.Callable[[str, str, int], str] = <function DataFrameAnalyticsFunctions._default_col_formatter>) → DataFrame[source]¶

Applies moving aggregations to the specified columns of the DataFrame using defined window sizes, and grouping and ordering criteria.

Parameters:
  • aggs – A dictionary where keys are column names and values are lists of the desired aggregation functions. Supported aggregation are listed here https://docs.snowflake.com/en/sql-reference/functions-analytic#list-of-functions-that-support-windows.

  • window_sizes – A list of positive integers, each representing the size of the window for which to calculate the moving aggregate.

  • order_by – A list of column names that specify the order in which rows are processed.

  • group_by – A list of column names on which the DataFrame is partitioned for separate window calculations.

  • col_formatter – An optional function for formatting output column names, defaulting to the format ‘<input_col>_<agg>_<window>’. This function takes three arguments: ‘input_col’ (str) for the column name, ‘operation’ (str) for the applied operation, and ‘value’ (int) for the window size, and returns a formatted string for the column name.

Returns:

A Snowpark DataFrame with additional columns corresponding to each specified moving aggregation.

Raises:
  • ValueError – If an unsupported value is specified in arguments.

  • TypeError – If an unsupported type is specified in arguments.

  • SnowparkSQLException – If an unsupported aggregration is specified.

Example

>>> data = [
...     ["2023-01-01", 101, 200],
...     ["2023-01-02", 101, 100],
...     ["2023-01-03", 101, 300],
...     ["2023-01-04", 102, 250],
... ]
>>> df = session.create_dataframe(data).to_df(
...     "ORDERDATE", "PRODUCTKEY", "SALESAMOUNT"
... )
>>> result = df.analytics.moving_agg(
...     aggs={"SALESAMOUNT": ["SUM", "AVG"]},
...     window_sizes=[2, 3],
...     order_by=["ORDERDATE"],
...     group_by=["PRODUCTKEY"],
... )
>>> result.show()
--------------------------------------------------------------------------------------------------------------------------------------
|"ORDERDATE"  |"PRODUCTKEY"  |"SALESAMOUNT"  |"SALESAMOUNT_SUM_2"  |"SALESAMOUNT_AVG_2"  |"SALESAMOUNT_SUM_3"  |"SALESAMOUNT_AVG_3"  |
--------------------------------------------------------------------------------------------------------------------------------------
|2023-01-04   |102           |250            |250                  |250.000              |250                  |250.000              |
|2023-01-01   |101           |200            |200                  |200.000              |200                  |200.000              |
|2023-01-02   |101           |100            |300                  |150.000              |300                  |150.000              |
|2023-01-03   |101           |300            |400                  |200.000              |600                  |200.000              |
--------------------------------------------------------------------------------------------------------------------------------------
Copy