snowflake.snowpark.dataframe.map_in_pandas¶

snowflake.snowpark.dataframe.map_in_pandas(dataframe: DataFrame, func: Callable, schema: Union[StructType, str], *, partition_by: Optional[Union[Column, str, List[Union[Column, str]]]] = None, imports: Optional[List[Union[str, Tuple[str, str]]]] = None, packages: Optional[List[Union[str, module]]] = None, immutable: bool = False, max_batch_size: Optional[int] = None)[source]¶

Returns a new DataFrame with the result of applying func to each batch of data in the dataframe. Func is expected to be a python function that takes an iterator of pandas DataFrames as both input and provides them as output. Number of input and output DataFrame batches can be different.

This function registers a temporary UDTF

Parameters:
  • dataframe – The DataFrame instance.

  • func – A function to be applied to the batches of rows.

  • schema – A StructType or type string that represents the expected output schema of the func parameter.

  • partition_by – A column or list of columns that will be used to partition the data before passing it to the func.

  • imports – A list of imports that are required to run the function. This argument is passed on when registering the UDTF.

  • packages – A list of packages that are required to run the function. This argument is passed on when registering the UDTF.

  • immutable – A flag to specify if the result of the func is deterministic for the same input.

  • max_batch_size – The maximum number of rows per input pandas DataFrame when using vectorized option.

Example 1:

>>> from snowflake.snowpark.dataframe import map_in_pandas
>>> df = session.create_dataframe([(1, 21), (2, 30), (3, 30)], schema=["ID", "AGE"])
>>> def filter_func(iterator):
...     for pdf in iterator:
...         yield pdf[pdf.ID == 1]
...
>>> map_in_pandas(df, filter_func, df.schema).show()
----------------
|"ID"  |"AGE"  |
----------------
|1     |21     |
----------------
Copy

Example 2:

>>> def mean_age(iterator):
...     for pdf in iterator:
...         yield pdf.groupby("ID").mean().reset_index()
...
>>> map_in_pandas(df, mean_age, "ID: bigint, AGE: double").order_by("ID").show()
----------------
|"ID"  |"AGE"  |
----------------
|1     |21.0   |
|2     |30.0   |
|3     |30.0   |
----------------
Copy

Example 3:

>>> def double_age(iterator):
...     for pdf in iterator:
...         pdf["DOUBLE_AGE"] = pdf["AGE"] * 2
...         yield pdf
...
>>> map_in_pandas(df, double_age, "ID: bigint, AGE: bigint, DOUBLE_AGE: bigint").order_by("ID").show()
-------------------------------
|"ID"  |"AGE"  |"DOUBLE_AGE"  |
-------------------------------
|1     |21     |42            |
|2     |30     |60            |
|3     |30     |60            |
-------------------------------
Copy

Example 4:

>>> def count(iterator):
...     for pdf in iterator:
...         rows, _ = pdf.shape
...         pdf["COUNT"] = rows
...         yield pdf
>>> map_in_pandas(df, count, "ID: bigint, AGE: bigint, COUNT: bigint", partition_by="AGE", max_batch_size=2).order_by("ID").show()
--------------------------
|"ID"  |"AGE"  |"COUNT"  |
--------------------------
|1     |21     |1        |
|2     |30     |2        |
|3     |30     |2        |
--------------------------
Copy