Snowpark Migration Accelerator: マージ

説明

MERGE ステートメントは、1つまたは複数のソーステーブルのデータをターゲットテーブルに結合し、更新と挿入を1回の操作で実行できます。定義した条件に基づいて、ターゲットテーブルの既存の行を更新するか、新しい行を挿入するかを決定します。このため、 INSERTUPDATEDELETE のステートメントを別々に使うよりも効率的です。MERGE ステートメントは、同じデータで複数回実行しても、常に一貫した結果が得られます。

Sparkでは、 MERGE 構文は Sparkドキュメント に記載されています。

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action |
     WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column = { expr | DEFAULT } } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )

not_matched_by_source_action
 { DELETE |
   UPDATE SET { column = { expr | DEFAULT } } [, ...] }
Copy

Snowflakeでは、 MERGE ステートメントはこの構文に従います(詳細については、 (Snowflakeドキュメント) をご参照ください)。

MERGE INTO <target_table> USING <source> ON <join_expr> { matchedClause | notMatchedClause } [ ... ]

matchedClause ::=
  WHEN MATCHED [ AND <case_predicate> ] THEN { UPDATE SET <col_name> = <expr> [ , <col_name2> = <expr2> ... ] | DELETE } [ ... ]
  
notMatchedClause ::=
   WHEN NOT MATCHED [ AND <case_predicate> ] THEN INSERT [ ( <col_name> [ , ... ] ) ] VALUES ( <expr> [ , ... ] )
Copy

主要な違いは、Snowflakeには WHEN NOT MATCHED BY SOURCE 句に直接相当するものがないことです。Snowflakeで同様の関数を実現するには、回避策が必要です。

サンプルソースパターン

サンプル補助データ

注釈

以下のコード例は、どのように動作するかをより理解しやすくするために実行されています。

CREATE OR REPLACE people_source ( 
  person_id  INTEGER NOT NULL PRIMARY KEY, 
  first_name STRING NOT NULL, 
  last_name  STRING NOT NULL, 
  title      STRING NOT NULL,
);

CREATE OR REPLACE TABLE people_target ( 
  person_id  INTEGER NOT NULL PRIMARY KEY, 
  first_name STRING NOT NULL, 
  last_name  STRING NOT NULL, 
  title      STRING NOT NULL DEFAULT 'NONE'
);


INSERT INTO people_target VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_target VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (2, 'Alice', 'Jones', 'Mrs.');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');
Copy
CREATE OR REPLACE TABLE people_source (
    person_id  INTEGER NOT NULL PRIMARY KEY,
    first_name VARCHAR(20) NOT NULL,
    last_name VARCHAR(20) NOT NULL,
    title VARCHAR(10) NOT NULL
);

CREATE OR REPLACE TABLE people_target (
    person_id  INTEGER NOT NULL PRIMARY KEY,
    first_name VARCHAR(20) NOT NULL,
    last_name VARCHAR(20) NOT NULL,
    title VARCHAR(10) NOT NULL DEFAULT 'NONE'
);


INSERT INTO people_target VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_target VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (2, 'Alice', 'Jones', 'Mrs.');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');
Copy

MERGE ステートメント - 挿入および更新の場合

Spark

MERGE INTO people_target pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id) 
WHEN MATCHED THEN UPDATE 
  SET pt.first_name = ps.first_name, 
      pt.last_name = ps.last_name, 
      pt.title = DEFAULT 
WHEN NOT MATCHED THEN INSERT 
  (pt.person_id, pt.first_name, pt.last_name, pt.title) 
  VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);


SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        2|Alice     |Jones    |NONE |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

Snowflake

MERGE INTO people_target2 pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id) 
WHEN MATCHED THEN UPDATE 
  SET pt.first_name = ps.first_name, 
      pt.last_name = ps.last_name, 
      pt.title = DEFAULT 
WHEN NOT MATCHED THEN INSERT 
  (pt.person_id, pt.first_name, pt.last_name, pt.title) 
  VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);


SELECT * FROM PUBLIC.people_target ORDER BY person_id;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        2|Alice     |Jones    |NONE |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

INSERTUPDATE の操作はSnowflakeでも同じように動作します。どちらの SQL 方言でも、 DEFAULT を式として使用すると、列をデフォルト値に設定できます。

Sparkでは、列を明示的にリストすることなく、挿入や更新の操作を行うことができます。列が指定されていない場合、操作はテーブル内のすべての列に影響します。これが正しく機能するためには、ソーステーブルと宛先テーブルが同一の列構造を持っている必要があります。列構造が一致しない場合、解析エラーが発生します。

UPDATE SET *
-- This is equivalent to UPDATE SET col1 = source.col1 [, col2 = source.col2 ...]

INSERT * 
-- This command copies all columns from the source table to the target table, matching columns by name. It is the same as explicitly listing all columns in both the INSERT and VALUES clauses.

Since Snowflake doesn't support these options, the migration process will instead list all columns from the target table.

### MERGE Statement - Delete Case

 
```{code} sql
:force:
MERGE INTO people_target pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id)
WHEN MATCHED AND pt.person_id < 3 THEN DELETE
WHEN NOT MATCHED BY TARGET THEN INSERT *;

SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

Snowflake

MERGE INTO people_target pt 
USING people_source ps 
ON    (pt.person_id = ps.person_id)
WHEN MATCHED AND pt.person_id < 3 THEN DELETE
WHEN NOT MATCHED THEN INSERT 
  (pt.person_id, pt.first_name, pt.last_name, pt.title) 
  VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);


SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        1|John      |Smith    |Mr   |
        3|Jane      |Doe      |Miss |
        4|Dave      |Brown    |Mr   |
Copy

Snowflakeの DELETE アクションは、他のデータベースと同じように動作します。また、 MATCHEDNOT MATCHED 句に条件を追加することもできます。

WHEN NOT MATCHED BY TARGETWHEN NOT MATCHED は同等の句で、 SQL マージステートメントで互換的に使用できます。

MERGE ステートメント - WHEN NOT MATCHED BY SOURCE

WHEN NOT MATCHED BY SOURCE 句は、ターゲットテーブルの行がソーステーブルに一致する行がない場合にトリガーされます。これは、 merge_condition とオプションの not_match_by_source_condition の両方がtrueと評価された場合に発生します。詳細については、 Sparkドキュメント をご参照ください。

Snowflakeはこの句を直接サポートしていません。この制限に対処するには、 DELETEUPDATE の両方のアクションに対して、以下の回避策を使用することができます。

MERGE INTO people_target pt 
USING people_source ps 
ON pt.person_id = ps.person_id
WHEN NOT MATCHED BY SOURCE THEN DELETE;


SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        2|Alice     |Jones    |NONE |
Copy

Snowflake

MERGE INTO people_target pt 
USING (
    SELECT 
        pt.person_id 
    FROM
        people_target pt LEFT 
    JOIN people_source ps ON pt.person_id = ps.person_id
    WHERE 
        ps.person_id is null
) s_src
    ON s_src.person_id = pt.person_id
WHEN MATCHED THEN DELETE;

SELECT * FROM people_target;
Copy
PERSON_ID|FIRST_NAME|LAST_NAME|TITLE|
---------+----------+---------+-----+
        2|Alice     |Jones    |NONE |
Copy

Snowflakeの DELETE アクションは、他のデータベースと同じように動作します。また、 MATCHEDNOT MATCHED 句に条件を追加することもできます。

既知の問題

1.MERGE はどちらの言語でもよく似ています。

Apache Sparkは追加機能を提供しますが、前の例で示したように、別のアプローチを使用してSnowflakeで同様の機能を実現できます。