Openflow Connector for Kinesis の JSON データ形式を設定する

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、 JSON データ形式用に Openflow Connector for Kinesis を設定する方法について説明します。これは、スキーマ進化機能を備えた基本的なメッセージの取り込みに最適化された簡素化されたコネクタです。

Openflow Connector for Kinesis の JSON データ形式は、KinesisストリームからSnowflakeテーブルへの JSON メッセージの簡単な取り込みを目的として設計されています。

前提条件

  1. Openflow Connector for Kinesis について を確認します。

  2. BYOCを使用してOpenflowを設定 しているか、 Snowflakeデプロイメントを使用してOpenflowを設定 していることを確認します。

  3. |OFSFSPCS-plural|を使用する場合、必要なドメインの構成 を精査し、 Kinesis コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

注釈

DLQ などその他のデータ形式や機能のサポートが必要な場合、Snowflakeの担当者にお問い合わせください。

Kinesisストリームを設定する

AWS 管理者として、 AWS アカウントで以下のアクションを実行します。

  1. Kinesis Streamsと DynamoDB <https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html>`_ にアクセスするための `IAM 権限を持つ AWS ユーザーがいることを確認します。

  2. AWS ユーザーが `アクセスキーの認証情報<https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html>`_ を構成したことを確認します。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

  2. データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。

    1. 宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。

      オブジェクト

      権限

      注意

      データベース

      USAGE

      スキーマ

      USAGE . CREATE TABLE .

      スキーマレベルのオブジェクトが作成された後に、 CREATE object 権限を取り消すことができます。

      テーブル

      OWNERSHIP

      Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。. コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。

      以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  4. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。

  6. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャーを構成した後、認証する方法を決定します。AWS では、他のシークレットを保持する必要がないため、Openflowに関連付けられた EC2 インスタンスロールを使用することをお勧めします。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  7. 他のSnowflakeユーザーがインジェストされたデータと作成されたテーブルへのアクセスを必要とする場合(たとえば、Snowflakeのカスタム処理のため)、ステップ2で作成したロールをそれらのユーザーに付与します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflow概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、Available runtimes ドロップダウンリストからランタイムを選択して Add をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. インポートされたプロセスグループを右クリックし、 Parameters を選択します。

  2. パラメーター の説明に従って、必要なパラメーター値を入力します。

パラメーター

このセクションでは、 Openflow Connector for Kinesis の JSON データ形式のすべてのパラメーターについて説明します。

コネクタはいくつかのモジュールで構成されています。セットを表示するには、コネクタプロセスグループをダブルクリックします。モジュールのパラメーターコンテキストで各モジュールのパラメーターを設定できます。

Snowflake宛先パラメーター

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

宛先スキーマ

データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

次の例をご参照ください。

  • CREATE SCHEMA SCHEMA_NAME または CREATE SCHEMA schema_name: SCHEMA_NAME を使用します

  • CREATE SCHEMA "schema_name" または CREATE SCHEMA "SCHEMA_NAME": それぞれ schema_name または SCHEMA_NAME を使用します

有り

Iceberg Enabled

Icebergがテーブル操作で有効になっているかどうか。次のいずれか: true/ false

有り

スキーマ進化が有効です

コネクタレベルでスキーマの進化を有効または無効にします。有効にすると、テーブルのスキーマの自動変更が許可されます。スキーマの進化は、テーブル固有のパラメーターを介して、個々のテーブルレベルでも制御できることに注意してください。次のいずれか: true/ false

有り

有効化された新しいテーブルのスキーマ進化

新しいテーブルを作成するときにスキーマの進化を有効にするかどうかを制御します。「true」に設定すると、 ENABLE_SCHEMA_EVOLUTION = TRUE パラメーターで新しいテーブルが作成されます。「false」に設定すると、 ENABLE_SCHEMA_EVOLUTION = FALSE パラメーターで新しいテーブルが作成されます。Icebergテーブルは自動的に作成されないため、適用されません。この設定はテーブルの作成にのみ影響し、既存のテーブルには影響しません。次のいずれか: true/ false

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_MANAGED_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。 BYOC デプロイメントでは、 SNOWFLAKE_MANAGED_TOKEN を使用するために、事前に ランタイムロール が構成されている必要があります。

  • BYOC: 代わりに、BYOC では認証戦略の値として KEY_PAIR を使用できます。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:Snowflakeロールを使用します。Openflow UI でSnowflakeのロールを見つけるには、ランタイムの View Details に移動してください。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Kinesis JSON ソースパラメーター

パラメーター

説明

必須

AWS リージョンコード

us-west-2 など、Kinesis Stream がある AWS リージョン。

有り

AWS アクセスキー ID

Kinesis Stream、 DynamoDB、およびオプションで CloudWatch に接続するための AWS アクセスキー ID。

有り

AWS シークレットアクセスキー

Kinesis Stream、 DynamoDB、およびオプションで CloudWatch に接続するための AWS シークレットアクセスキー。

有り

Kinesisアプリケーション名

アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。

有り

Kinesisコンシューマータイプ

Kinesisストリームから記録を読み取るために使用される戦略。これは次のいずれかの値である必要があります: SHARED_THROUGHPUT または ``ENHANCED_FAN_OUT``詳細については、 強化されたファンアウトコンシューマーの開発 を参照してください。

有り

Kinesisストリーム初期位置

データの複製を開始するストリームの初期位置。

可能な値は次のとおりです。

  • LATEST: 最新の格納された記録

  • TRIM_HORIZON: 最も古い格納された記録

有り

Kinesisストリーム名

データを消費する AWS Kinesisストリーム名。

有り

メトリック公開

Kinesisクライアントライブラリのメトリックが公開される場所を指定します。可能な値: DISABLEDLOGSCLOUDWATCH

有り

フローを実行する

  1. 平面を右クリックして、 Enable all Controller Services を選択します。

  2. コネクタのプロセスグループを右クリックし、 Start を選択します。

コネクタがデータの取り込みを開始します。

テーブルスキーマ

コネクタによってロードされたSnowflakeテーブルには、Kinesisメッセージのキーの名前が付けられた列が含まれます。コネクタは、レコードに関するメタデータを格納する KINESISMETADATA 列も追加します。

以下は、コネクタによってロードされるSnowflakeテーブルの例です。

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ ... KINESISMETADATA object ... }

2

XYZ789

ZABZX

SELL

3024

{ ... KINESISMETADATA object ... }

3

XYZ789

ZTEST

SELL

799

{ ... KINESISMETADATA object ... }

4

ABC123

ZABZX

BUY

2033

{ ... KINESISMETADATA object ... }

KINESISMETADATA 列には、次のフィールドを持つオブジェクトが含まれます。

フィールド名

フィールド型

値の例

説明

stream

String

stream-name

記録が由来するKinesisストリームの名前です。

shardId

String

shardId-000000000001

記録が由来するストリーム内のシャードの識別子です。

approximateArrival

String

2025-11-05T09:12:15.300

記録がストリームに挿入されたおおよその時間(ISO 8601形式)

partitionKey

String

key-1234

記録のデータプロデューサーによって指定されたパーティションキー。

sequenceNumber

String

123456789

Kinesis Data Streamsによってシャード内の記録に割り当てられた一意のシーケンス番号。

subSequenceNumber

2

記録のサブシーケンス番号(同じシーケンス番号を持つ集計記録に使用)

shardedSequenceNumber

String

12345678900002

記録のシーケンス番号とサブシーケンス番号の組み合わせ。

スキーマの進化

このコネクタは、スキーマの自動検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。

Snowflakeは受信データのスキーマを検出し、ユーザー定義スキーマに一致するテーブルにデータをロードします。Snowflakeでは、新しい列を追加したり、新しい受信レコードで欠落している列から NOT NULL 制約をドロップしたりすることもできます。

コネクタを使用したスキーマ検出は、提供された JSON データに基づいてデータ型を推測します。

コネクタがターゲットテーブルを作成する場合、スキーマ進化はデフォルトで有効になります。

既存のテーブルのスキーマ進化を有効または無効にする場合は、 ALTER TABLE コマンドを使用して ENABLE_SCHEMA_EVOLUTION パラメーターを設定します。テーブルで:code:OWNERSHIP`権限を持つロールを使用することも必要です。詳細については、 :doc:/user-guide/data-load-schema-evolution` をご参照ください。

ただし、既存のテーブルでスキーマ進化が無効になっている場合、コネクタは一致しないスキーマを持つ行を構成された失敗出力ポートに送信しようとします。

Icebergテーブルのサポート

Openflow Connector for Kinesis は、Iceberg有効true に設定されている場合、Snowflake管理の Apache Iceberg™ にデータを取り込むことができます。

要件と制限

Icebergテーブルの取り込み用に Openflow Connector for Kinesis を構成する前に、次の要件と制限に注意してください。

  • コネクタを実行する前にIcebergテーブルを作成する必要があります。

  • ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。

構成とセットアップ

Icebergテーブルの取り込み用に Openflow Connector for Kinesis を構成するには、次のセクションに記載されているいくつかの違いを踏まえて、 Openflow Connector for Kinesis の JSON データ形式を設定する のステップに従います。

Icebergテーブルへの取り込みを有効にする

Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。

インジェスト用にIcebergテーブルを作成

コネクタを実行する前に、Icebergテーブルを作成する必要があります。初期のテーブルスキーマは、コネクタの スキーマ進化が有効です プロパティ設定によって異なります。

スキーマの進化を有効にすると、 kinesisMetadata という列を持つテーブルを作成する必要があります。コネクタはメッセージフィールドの列を自動的に作成し、 kinesisMetadata 列スキーマを変更します。

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

スキーマの進化が無効になっている場合は、Kinesisメッセージが含むすべてのフィールドでテーブルを作成する必要があります。Icebergテーブルを作成する場合、Icebergデータタイプまたは 互換性のあるSnowflakeデータタイプ を使用できます。半構造化 VARIANT タイプはサポートされていません。代わりに、 構造化された OBJECT または MAP を使用してください。

例えば、次のようなメッセージを考えてみましょう:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

次のステートメントは、Kinesisメッセージに含まれるすべてのフィールドを持つテーブルを作成します。

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

注釈

kinesisMetadata は常に作成される必要があります。dogscats のような入れ子構造内のフィールド名では、大文字と小文字が区別されます。