Openflow Connector for Kinesis を設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

このトピックでは、 Openflow Connector for Kinesis を設定する手順について説明します。

前提条件

  1. Openflow Connector for Kinesis について を確認してください。

  2. Openflowを設定した ことを確認します。

Kinesisストリームを設定する

AWS 管理者として、 AWS アカウントで以下のアクションを実行します。

  1. IAM 権限を持つ AWS アカウントがあり、Kinesis Streamsおよび DynamoDB にアクセスできることを確認します。

  2. オプションで、デッドレターキュー(DLQ)Kinesisストリームを作成します。正常に解析できなかったメッセージは、指定した DLQ にリダイレクトすることができます。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

  2. データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。

    1. 宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。

      オブジェクト

      権限

      注意

      データベース

      USAGE

      スキーマ

      USAGE . CREATE TABLE .

      スキーマレベルのオブジェクトが作成された後に、 CREATE object 権限を取り消すことができます。

      テーブル

      OWNERSHIP

      Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。 . コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。

      以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role_1;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1;
      
      -- Only for existing tables
      GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
      
      Copy
  3. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  4. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。

    GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1;
    ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
    
    Copy
  5. ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。

  6. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャに関連付けられたParameter Providerを構成します。 Controller Settings » Parameter Provider に移動し、パラメーターの値をフェッチします。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  7. 他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。

  8. コネクタが使用するウェアハウスを指定します。まずは最小のウェアハウスサイズから始め、複製するテーブルの数や転送するデータ量に応じて異なるサイズを試してみてください。テーブル数が大きい場合は、通常、ウェアハウスのサイズを大きくするよりも、 マルチクラスターウェアハウス を使用した方がスケーリングが向上します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflowの概要ページに移動します。 Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  4. Add を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  5. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  6. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. フローパラメーター の説明に従って、必要なパラメーター値を入力します。

フローパラメーター

このセクションでは、以下のパラメーターコンテキストに基づいて構成できるフローパラメーターについて説明します。

Kinesisソースパラメーター

パラメーター

説明

AWS リージョンコード

us-west-2 など、Kinesis Stream がある AWS リージョン。

AWS アクセスキー ID

Kinesisストリームおよび DynamoDB に接続するための AWS アクセスキー ID。

AWS シークレットアクセスキー

Kinesisストリーム DynamoDB に接続するための AWS シークレットアクセスキー。

スキーマレジストリ URL

AVRO スキーマレジストリの URL。これは、 AVRO Schema Access Strategyパラメーターが schema-reference-reader に設定されている場合に必要です。

スキーマレジストリの認証タイプ

AVRO スキーマレジストリが使用する認証タイプ。これは、 AVRO Schema Access Strategyパラメーターが schema-reference-reader に設定されている場合に必要です。

可能な値は次のとおりです。
  • NONE: 認証は使用しない

  • BASIC: ユーザー名/パスワードによる認証方法を使用

Schema Registry Username

AVRO スキーマレジストリの BASIC 認証に使用するユーザー名。これは、スキーマレジストリ認証タイプのパラメーターが BASIC に設定されている場合に必要です。

Schema Registry Password

AVRO スキーマレジストリへの BASIC 認証に使用するパスワード。これは、スキーマレジストリ認証タイプのパラメーターが BASIC に設定されている場合に必要です。

Kinesis宛先パラメーター

パラメーター

説明

宛先データベース

データが永続化されるデータベース。既にSnowflakeに存在している必要があります。

宛先スキーマ

データが永続化されるスキーマ。既にSnowflakeに存在している必要があります。このパラメーターは大文字と小文字を区別します。

Snowflakeアカウント識別子

データが永続化されるSnowflakeアカウント名(形式: [organization-name]- [account-name])。

Snowflake認証ストラテジー

Snowflakeへの認証のストラテジー。可能な値: SPCS でフローを実行している場合は SNOWFLAKE_SESSION_TOKEN、秘密キーを使用してアクセスを設定する場合は KEY_PAIR

Snowflake秘密キー

認証に使用される RSA 秘密キー。RSA キーは、 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持つ必要があります。Snowflake秘密キーファイルまたはSnowflake秘密キーのいずれかを定義する必要があることに注意してください。

Snowflake秘密キーファイル

Snowflakeへの認証に使用される RSA 秘密キーを含むファイル。 PKCS8 標準に従って書式設定され、標準的な PEM ヘッダーとフッターを持ちます。ヘッダー行は -----BEGIN PRIVATE で始まります。 Reference asset チェックボックスを選択し、秘密キーファイルをアップロードします。

Snowflake秘密キーパスワード

Snowflake秘密キーファイルに関連付けられたパスワード。

Snowflakeロール

クエリ実行時に使用されるSnowflakeロール。

Snowflakeのユーザー名

Snowflakeインスタンスへの接続に使用するユーザー名。

Snowflakeウェアハウス

クエリ実行に使用されるSnowflakeウェアハウス。このパラメーターは大文字と小文字を区別します。

Kinesis取り込みパラメーター

パラメーター

説明

Kinesisアプリケーション名

アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。

Kinesisストリーム名

データを消費する AWS Kinesisストリーム名。

Kinesisストリーム初期位置

データの複製を開始するストリームの初期位置。

可能な値は次のとおりです。
  • LATEST: 最新の格納された記録

  • TRIM_HORIZON: 最も古い格納された記録

Kinesis DLQ ストリーム名

処理に失敗したすべての記録が送信されるストリーム名。このパラメーターが追加されていない場合、Openflowキャンバスのコネクタの DLQ 関連部分に警告記号が表示されます。

メッセージ形式

Kinesisのメッセージ形式。

可能な値は次のとおりです。
  • JSON: JSON は、人間が読むことができ、メッセージ自体からスキーマを推測できるメッセージ形式です。

  • AVRO: AVRO は、メッセージのデータにアクセスするためにスキーマを必要とするメッセージ形式です。

AVRO Schema Access Strategy

AVRO メッセージ形式のデータにアクセスするには、スキーマが必要です。このパラメーターは、特定メッセージの AVRO スキーマにアクセスするためのストラテジーを定義します。メッセージ形式パラメーターが AVRO に設定されている場合、スキーマが使用されます。

可能な値:
  • embedded-avro-schema: スキーマは記録そのものに埋め込まれています。

  • schema-reference-reader: スキーマはConfluentスキーマレジストリに格納されます。

Kinesisストリームからテーブルへのマッピング

このオプションのパラメーターにより、ユーザーはどのストリームをどのテーブルにマッピングするかを指定できます。各ストリームとそのテーブル名はコロンで区切る必要があります。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するストリームは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、ストリーム名がテーブル名として使用されます。

例:
  • stream1:low_range,stream2:low_range,stream5:high_range,stream6:high_range

  • stream[0-4]:low_range,stream[5-9]:high_range

Iceberg Enabled

プロセッサーがデータをIcebergテーブルに取り込むかどうかを指定します。このプロパティが実際のテーブルタイプと一致しない場合、プロセッサーは失敗します。

可能な値:
  • true

  • false

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. インポートしたプロセスグループを右クリックし、 Start を選択します。

コネクタがデータの取り込みを開始します。

スキーマ

コネクタによってロードされたSnowflakeテーブルには、Kinesisメッセージのキーによって命名された列が含まれます。以下にそのようなテーブルの例を示します。

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

ABC123

ZTEST

BUY

3572

2

XYZ789

ZABZX

SELL

3024

3

XYZ789

ZTEST

SELL

799

4

ABC123

ZABZX

BUY

2033

5

ABC123

ZTEST

BUY

1558

スキーマの進化

現在、 Iceberg Enabledfalse に設定されている場合。コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。既存のテーブルでスキーマ進化を有効または無効にしたい場合は、 ALTER TABLE コマンドを使用して、 ENABLE_SCHEMA_EVOLUTION パラメーターを設定します。テーブルで OWNERSHIP 権限を持つロールを使用することも必要です。詳細については、 テーブルスキーマの進化 をご参照ください。

ただし、既存のテーブルに対してスキーマの進化が無効になっている場合、コネクタは不一致のスキーマがある行を構成されたデッドレターキュー(DLQ)に送ろうとします。

Iceberg Enabledtrue に設定されているケースについては、段落 Apache Iceberg™ テーブルのスキーマの進化 をご参照ください。

Apache Iceberg™ テーブルと Openflow Connector for Kinesis の併用

Openflow Connector for Kinesis では、Snowflakeが管理する Apache Iceberg™テーブル にデータを取り込むことができます。

要件と制限

Icebergテーブルの取り込み用にコネクタを構成する前に、以下の要件と制限に注意してください。

  • コネクタを実行する前にIcebergテーブルを作成する必要があります。

  • ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。

  • スキーマの進化はIcebergテーブルではサポートされていません。

構成とセットアップ

Icebergテーブルの取り込み用にコネクタを構成するには、 コネクタを構成する の指示に従いますが、以下のセクションで説明するいくつかの違いがあります。

Icebergテーブルへの取り込みを有効にする

Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。

インジェスト用にIcebergテーブルを作成

コネクタを実行する前に、Icebergテーブルを作成する必要があります。スキーマの進化はサポートされていないため、Kinesisメッセージが含むすべてのフィールドでテーブルを作成する必要があります。

Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。

例えば、次のようなメッセージを考えてみましょう:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントを使用します:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

注釈

dogscats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。

Apache Iceberg™ テーブルのスキーマの進化

現在、コネクタは Apache Iceberg™ テーブルのスキーマの進化をサポートしていません。

既知の問題

  • コネクタのプロセスグループには、「Upload Failure」という名前の出力ポートが1つあります。Snowflakeに正常にアップロードされなかった FlowFiles の処理に使用することができます。このポートがコネクタのプロセスグループの外部に接続されていないと警告サインが表示されますが、無視することができます。

  • すべてのプロセッサーは、停止しているときに、一度だけ実行するように命令することができます。ConsumeKinesisStream プロセッサーは、内部アーキテクチャのため、一度だけ実行を命じられても有意義な仕事はしません。プロセッサーが動き出すには、始動して約2分間作動させる必要があります。