You are viewing documentation about an older version (1.14.0). View latest version

snowflake.snowpark.Table.merge

Table.merge(source: DataFrame, join_expr: Column, clauses: Iterable[Union[WhenMatchedClause, WhenNotMatchedClause]], *, statement_params: Optional[Dict[str, str]] = None, block: bool = True) MergeResult[source]
Table.merge(source: DataFrame, join_expr: Column, clauses: Iterable[Union[WhenMatchedClause, WhenNotMatchedClause]], *, statement_params: Optional[Dict[str, str]] = None, block: bool = False) snowflake.snowpark.AsyncJob

Merges this Table with DataFrame source on the specified join expression and a list of matched or not-matched clauses, and returns a MergeResult, representing the number of rows inserted, updated and deleted by this merge action. See MERGE for details.

Parameters:
  • source – A DataFrame to join with this Table. It can also be another Table.

  • join_expr – A Column object representing the expression on which to join this Table and source.

  • clauses – A list of matched or not-matched clauses specifying the actions to perform when the values from this Table and source match or not match on join_expr. These actions can only be instances of WhenMatchedClause and WhenNotMatchedClause, and will be performed sequentially in this list.

  • statement_params – Dictionary of statement level parameters to be set while executing this action.

  • block – A bool value indicating whether this function will wait until the result is available. When it is False, this function executes the underlying queries of the dataframe asynchronously and returns an AsyncJob.

Example:

>>> from snowflake.snowpark.functions import when_matched, when_not_matched
>>> from snowflake.snowpark.types import IntegerType, StringType, StructField, StructType
>>> schema = StructType([StructField("key", IntegerType()), StructField("value", StringType())])
>>> target_df = session.create_dataframe([(10, "old"), (10, "too_old"), (11, "old")], schema=schema)
>>> target_df.write.save_as_table("my_table", mode="overwrite", table_type="temporary")
>>> target = session.table("my_table")
>>> source = session.create_dataframe([(10, "new"), (12, "new"), (13, "old")], schema=schema)
>>> target.merge(source, (target["key"] == source["key"]) & (target["value"] == "too_old"),
...              [when_matched().update({"value": source["value"]}), when_not_matched().insert({"key": source["key"]})])
MergeResult(rows_inserted=2, rows_updated=1, rows_deleted=0)
>>> target.sort("key", "value").collect()
[Row(KEY=10, VALUE='new'), Row(KEY=10, VALUE='old'), Row(KEY=11, VALUE='old'), Row(KEY=12, VALUE=None), Row(KEY=13, VALUE=None)]
Copy