snowflake.snowpark.dataframe.map_in_pandas¶
- snowflake.snowpark.dataframe.map_in_pandas(dataframe: DataFrame, func: Callable, schema: Union[StructType, str], *, partition_by: Optional[Union[Column, str, List[Union[Column, str]]]] = None, imports: Optional[List[Union[str, Tuple[str, str]]]] = None, packages: Optional[List[Union[str, module]]] = None, immutable: bool = False, max_batch_size: Optional[int] = None)[source]¶
Returns a new DataFrame with the result of applying func to each batch of data in the dataframe. Func is expected to be a python function that takes an iterator of pandas DataFrames as both input and provides them as output. Number of input and output DataFrame batches can be different.
This function registers a temporary UDTF
- Parameters:
dataframe – The DataFrame instance.
func – A function to be applied to the batches of rows.
schema – A StructType or type string that represents the expected output schema of the func parameter.
partition_by – A column or list of columns that will be used to partition the data before passing it to the func.
imports – A list of imports that are required to run the function. This argument is passed on when registering the UDTF.
packages – A list of packages that are required to run the function. This argument is passed on when registering the UDTF.
immutable – A flag to specify if the result of the func is deterministic for the same input.
max_batch_size – The maximum number of rows per input pandas DataFrame when using vectorized option.
Example 1:
>>> from snowflake.snowpark.dataframe import map_in_pandas >>> df = session.create_dataframe([(1, 21), (2, 30), (3, 30)], schema=["ID", "AGE"]) >>> def filter_func(iterator): ... for pdf in iterator: ... yield pdf[pdf.ID == 1] ... >>> map_in_pandas(df, filter_func, df.schema).show() ---------------- |"ID" |"AGE" | ---------------- |1 |21 | ----------------
Example 2:
>>> def mean_age(iterator): ... for pdf in iterator: ... yield pdf.groupby("ID").mean().reset_index() ... >>> map_in_pandas(df, mean_age, "ID: bigint, AGE: double").order_by("ID").show() ---------------- |"ID" |"AGE" | ---------------- |1 |21.0 | |2 |30.0 | |3 |30.0 | ----------------
Example 3:
>>> def double_age(iterator): ... for pdf in iterator: ... pdf["DOUBLE_AGE"] = pdf["AGE"] * 2 ... yield pdf ... >>> map_in_pandas(df, double_age, "ID: bigint, AGE: bigint, DOUBLE_AGE: bigint").order_by("ID").show() ------------------------------- |"ID" |"AGE" |"DOUBLE_AGE" | ------------------------------- |1 |21 |42 | |2 |30 |60 | |3 |30 |60 | -------------------------------
Example 4:
>>> def count(iterator): ... for pdf in iterator: ... rows, _ = pdf.shape ... pdf["COUNT"] = rows ... yield pdf >>> map_in_pandas(df, count, "ID: bigint, AGE: bigint, COUNT: bigint", partition_by="AGE", max_batch_size=2).order_by("ID").show() -------------------------- |"ID" |"AGE" |"COUNT" | -------------------------- |1 |21 |1 | |2 |30 |2 | |3 |30 |2 | --------------------------