Openflow Connector for Amazon Kinesis Data Streams を設定する

注釈

このコネクタは、 Snowflakeコネクタ規約 に従うものとします。

このトピックでは、Openflow Connector for Amazon Kinesis Data Streams を設定する方法について説明します。

Openflow Connector for Amazon Kinesis Data Streams は、スキーマ進化機能を使用してKinesisストリームからSnowflakeテーブルに JSON メッセージを取り込むために設計されています。

Openflow Connector for Kinesisの設定

前提条件

  1. Openflow Connector for Amazon Kinesis Data Streams を確認します。

  2. :doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。

  3. Openflow - Snowflakeデプロイを使用している場合は、必要なドメインの構成 を確認して、Kinesisコネクタに必要なドメインへのアクセス権を付与していることを確認してください。

AWS での IAM のロールとポリシーの設定

AWS 管理者として、 AWS アカウントで以下のアクションを実行します。

  1. KinesisデータストリームにアクセスするためにOpenflowが使用する AWS IAM のユーザーまたはロールを作成します。詳細については、AWS ドキュメントの IAM ユーザーの作成 をご参照ください。

  2. AWS ユーザーが アクセスキーの認証情報 を構成していることを確認します。

  3. AWS ユーザーに次の IAM 権限を付与します。

    サービス

    アクション

    リソース(ARNs)

    目的

    Amazon Kinesis Data Streams

    kinesis:DescribeStreamkinesis:DescribeStreamConsumerkinesis:GetRecordskinesis:GetShardIteratorkinesis:ListShardskinesis:RegisterStreamConsumer

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}

    シャードを検出し、共有スループットポーリングを通じて記録を読み取り、ストリーム ARN を解決します。また、強化されたファンアウトコンシューマーを登録し、登録中にコンシューマーのステータスをポーリングします。

    Amazon Kinesis Data Streams

    kinesis:DeregisterStreamConsumerkinesis:DescribeStreamConsumerkinesis:SubscribeToShard

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*

    コンシューマー ARN ごとに強化されたファンアウトコンシューマーの説明、サブスクライブ、および登録解除。

    Amazon DynamoDB

    dynamodb:CreateTabledynamodb:DeleteTabledynamodb:DescribeTabledynamodb:GetItemdynamodb:PutItemdynamodb:Querydynamodb:Scandynamodb:UpdateItem

    arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration

    チェックポイント/リーステーブル(シャードリース、ノードハートビート、チェックポイント)と、レガシーチェックポイントテーブルからの1回限りの移行時に使用される仮移行テーブルを作成および管理します。

    IAM ポリシーの例:

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "KinesisStreamAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:RegisterStreamConsumer"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}"
            },
            {
                "Sid": "KinesisConsumerAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DeregisterStreamConsumer",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*"
            },
            {
                "Sid": "DynamoDBTableAccess",
                "Effect": "Allow",
                "Action": [
                    "dynamodb:CreateTable",
                    "dynamodb:DeleteTable",
                    "dynamodb:DescribeTable",
                    "dynamodb:GetItem",
                    "dynamodb:PutItem",
                    "dynamodb:Query",
                    "dynamodb:Scan",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}",
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration"
                ]
            }
        ]
    }
    

    ポリシーの例を使用する前に、次のプレースホルダーを置き換えます。

    プレースホルダー

    説明

    ${REGION}

    ご自身の AWS リージョン(例: us-east-1

    ${ACCOUNT_ID}

    ご自身の AWS アカウント ID(例: 123456789012

    ${STREAM_NAME}

    **AWS ストリーム名**コネクタパラメーターの値

    ${APPLICATION_NAME}

    **AWS Kinesisアプリケーション名**コネクタパラメーターの値。DynamoDB チェックポイントテーブル名および強化されたファンアウト登録済みコンシューマー名として使用されます。

    注釈

    • ${APPLICATION_NAME}_migration テーブルはレガシーチェックポイントテーブルから新しいスキーマへの1回限りの移行中にのみ作成された DynamoDB の仮テーブルです。移行が完了すると自動的に削除されます。デプロイによりレガシー KCL ベースのコネクタが使用されたことがない場合は、移行テーブルの ARN をポリシーから省略できます。

    • dynamodb:DeleteTable アクションは移行プロセス中に使用され、移行の完了が確認された後にポリシーから削除できます。

    • kinesis:DeregisterStreamConsumer アクションは、プロセッサーがキャンバスから削除されたときに呼び出されます。IAM プリンシパルにこの権限が付与されていない場合、コンシューマーは AWS コンソールまたは CLI を使用して手動で登録解除される必要があります。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  2. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

    コネクタでは、ユーザーが宛先テーブルを作成する必要があります。ユーザーにSnowflakeオブジェクトの管理に必要な権限が付与されていることを確認します。

    オブジェクト

    権限

    注意

    データベース

    USAGE

    スキーマ

    USAGE

    テーブル

    OWNERSHIP

    コネクタがデータをテーブルに取り込むために必要です。

    Snowflakeは、アクセス制御を改善するために、各Kinesisストリームに個別のユーザーとロールを作成することをお勧めします。

    以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

    USE ROLE securityadmin;
    
    CREATE ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON DATABASE kinesis_db TO ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON SCHEMA kinesis_schema TO ROLE openflow_kinesis_connector_role_1;
    

    注釈

    権限はコネクタロールに直接付与する必要があり、継承することはできません。

  3. 宛先テーブルの構成

    スキーマの変更にサーバー側のスキーマ進化を使用し、DMLエラーのロギングにエラーテーブル を使用することを強くお勧めします。

    以下の例は、テーブルを作成し、OWNERSHIP 権限を追加する方法を示しています。

    USE ROLE openflow_kinesis_connector_role_1;
    
    CREATE TABLE kinesis_db.kinesis_schema.<DESTINATION_TABLE_NAME> (
      kinesisMetadata object
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE <DESTINATION_TABLE_NAME> TO ROLE openflow_kinesis_connector_role_1;
    

    これらのコネクタは、スキーマの自動検出と進化をサポートします。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。記録コンテンツの第1レベルのキーを、名前で一致するテーブル列に自動的にマッピングします(大文字と小文字の区別なし)。

    スキーマ進化を有効にすると、Snowflakeは受信ストリームで検出された新しい列を追加し、NOT NULL 制約をドロップすることで、宛先テーブルを自動的に拡張して、新しいデータパターンに対応することができます。詳細については、 テーブルスキーマの進化 をご参照ください。

    ENABLE_SCHEMA_EVOLUTION が有効になっていない場合は、テーブル定義を拡張してスキーマを手動で作成する必要があります。コネクタは、記録コンテンツの第1レベルのキーを名前でテーブル列と一致させようとします。JSONからのキーがテーブル列と一致しない場合、コネクタはキーを無視します。

  4. (オプション)シークレットマネージャーの構成

    Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS では、他のシークレットを保持する必要がないため、Openflowに関連付けられた EC2 インスタンスロールを使用することをお勧めします。

    2. Openflowキャンバスで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  5. ユーザーへのアクセス権の付与

    コネクタによって取り込まれた未加工のデータへのアクセスを必要とする(たとえば、Snowflakeでのカスタム処理のため)他のSnowflakeユーザーについては、ステップ2で作成したロールを付与する必要があります。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflow概要ページに移動します。特集コネクタ セクションで、その他のコネクタを表示 を選択します。

  2. Openflowコネクタページで、Amazon Kinesis Data Streams用Openflowコネクタ を見つけて、ランタイムに追加 を選択します。

  3. ランタイムを選択ダイアログで、利用可能なランタイム ドロップダウンリストからランタイムを選択し、追加 をクリックします。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベース、スキーマおよびテーブルをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイに対する認証を行い、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたら、許可する を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

    コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. 必要に応じて、組み込みパラメーターを構成する前に、コネクタ構成をカスタマイズします。

  2. プロセスグループパラメーターを入力する

    1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

    2. 必要なパラメーター値を入力します。

共通パラメーター

パラメーター

説明

必須

AWS アクセスキー ID

Kinesis Streamおよび DynamoDB に接続するための AWS アクセスキー ID。

有り

AWS Kinesisのリージョン

接続先の AWS リージョン。通常の AWS リージョン形式(例: us-west-2ap-southeast-1eu-west-1)。AWS リージョン ページをご参照ください。

有り

AWS シークレットアクセスキー

Kinesis Streamおよび DynamoDB に接続するための AWS シークレットアクセスキー。

有り

AWS Kinesisアプリケーション名

Kinesis Stream消費に対するアプリケーションの進捗状況を追跡するための DynamoDB テーブル名として使用される名前。

有り

AWS Kinesisコンシューマータイプ

Kinesisストリームから記録を読み取るために使用される戦略。

次のいずれかの値であることが必要: SHARED_THROUGHPUTENHANCED_FAN_OUT

詳細については、共有スループットコンシューマーと強化されたファンアウトコンシューマーの違い をご参照ください。

有り

AWS Kinesisの初期ストリーム位置

データが複製を開始する初期ストリーム位置。これは、指定された AWS Kinesisアプリケーション名の最初の開始時にのみ有効になります。

可能な値は次のとおりです。

LATEST: 保存された最新のレコード。

TRIM_HORIZON: 最も古い保存済みレコード。

有り

AWS Kinesisストリーム名

データの消費元となる AWS Kinesisストリーム名。

有り

Snowflake宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

Snowflake宛先スキーマ

データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

次の例をご参照ください。

CREATE SCHEMA SCHEMA_NAME または CREATE SCHEMA schema_name: SCHEMA_NAME を使用します。

CREATE SCHEMA "schema_name" または CREATE SCHEMA "SCHEMA_NAME": それぞれ schema_name または SCHEMA_NAME を使用します。

有り

Snowflake宛先テーブル

データが永続化されるテーブル。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

コネクタを起動します。

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. 平面を右クリックして、Start を選択します。コネクタがデータの取り込みを開始します。

KINESISMETADATA 列についての理解

コネクタは、Kinesis記録に関するメタデータを含む KINESISMETADATA 構造を入力します。構造には次の情報が含まれています。

フィールド名

フィールド型

値の例

説明

ストリーム

String

stream-name

記録が由来するKinesisストリームの名前です。

shardId

String

shardId-000000000001

記録が由来するストリーム内のシャードの識別子です。

approximateArrival

String

2025-11-05T09:12:15.300

記録がストリームに挿入されたおおよその時間(ISO 8601形式)

partitionKey

String

key-1234

記録のデータプロデューサーによって指定されたパーティションキー。

sequenceNumber

String

123456789

Kinesis Data Streamsによってシャード内の記録に割り当てられた一意のシーケンス番号。

subSequenceNumber

2

記録のサブシーケンス番号(同じシーケンス番号を持つ集計記録に使用)

shardedSequenceNumber

String

12345678900002

記録のシーケンス番号とサブシーケンス番号の組み合わせ。

取り込みのレイテンシの測定

行の変更時間に基づいて変更の追跡、増分処理、およびTime Travelクエリを実行するには、ROW_TIMESTAMP 機能を使用します。

宛先テーブルで次のコマンドを実行すると有効にできます。

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

行のタイムスタンプが有効になると、テーブルは METADATA$ROW_LAST_COMMIT_TIME 列を公開します。これは、各行が最後に変更された時点のタイムスタンプを返します。

詳細については、行のタイムスタンプ をご参照ください。

注釈

行のタイムスタンプはインタラクティブテーブルでは使用できません。詳細については、 インタラクティブテーブルの制限 をご参照ください。

Apache Iceberg™テーブルでのコネクタの使用

コネクタは、Snowflakeが管理するApache Iceberg™テーブルにデータを取り込むことができますが、以下の要件を満たしている必要があります。

  • Apache Iceberg™テーブルに関連付けられた外部ボリュームに対する USAGE 権限を付与されている必要があります。

  • コネクタを実行する前にApache Iceberg™テーブルを作成する必要があります。

外部ボリュームの使用許可

たとえば、Icebergテーブルが kinesis_external_volume 外部ボリュームを使用し、コネクタがロール openflow_kinesis_connector_role_1 を使用する場合、次のステートメントを実行します:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kinesis_external_volume TO ROLE openflow_kinesis_connector_role_1;

取り込み用のApache Iceberg™テーブルの作成

コネクタは自動的にIcebergテーブルを作成せず、スキーマの進化もサポートしません。コネクタを実行する前に、Icebergテーブルを手動で作成する必要があります。

Icebergテーブルを作成する場合、Icebergデータタイプ(VARIANTなど)または 互換性のあるSnowflakeデータタイプ を使用できます。

例えば、次のようなメッセージを考えてみましょう:

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントの1つを使用します:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kinesisMetadata OBJECT(
    stream STRING,
    shardId STRING,
    approximateArrival STRING,
    partitionKey STRING,
    sequenceNumber STRING,
    subSequenceNumber INTEGER,
    shardedSequenceNumber STRING
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

インタラクティブテーブルでのコネクタの使用

インタラクティブテーブルは、低レイテンシの高同時実行性クエリ向けに最適化されたSnowflakeテーブルの特別なタイプです。インタラクティブテーブルの詳細については、 インタラクティブテーブルドキュメント をご参照ください。

  1. インタラクティブテーブルを作成します。

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) CLUSTER BY (metric_name)
    AS (SELECT
      $1:M_NAME::VARCHAR,
      $1:M_VALUE::NUMBER,
      $1:RECORD_METADATA.topic::VARCHAR,
      $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
    from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

重要な考慮事項:

  • インタラクティブテーブルには特定の制限とクエリ制限があります。コネクタで使用する前に インタラクティブテーブルドキュメント をご確認ください。

  • インタラクティブテーブルの場合、必要な変換はテーブル定義で処理する必要があります。

  • インタラクティブテーブルを効率的にクエリするには、インタラクティブウェアハウスが必要です。

宛先テーブルに対する顧客定義スキーマでコネクタを使用

コネクタは、各Kinesis記録をSnowflakeテーブルに挿入される行として扱います。たとえば、以下の JSON のように構造化されたメッセージのコンテンツを持つKinesisトピックがある場合:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

デフォルトでは、JSON からすべてのフィールドを指定する必要はありません。スキーマの進化によって処理されます。ただし、静的スキーマを優先的に使用する場合は、次の処理を実行して作成できます。

CREATE TABLE ORDERS (
  kinesisMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

顧客定義の PIPE でのコネクタの使用

独自のパイプを作成することを選択した場合は、パイプの COPY INTO ステートメントでデータ変換ロジックを定義できます。必要に応じて列の名前を変更しデータ型をキャストすることができます。例:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::STRING,
    $1:customer_name,
    $1:order_total::STRING,
    $1:isPaid::STRING
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

独自のパイプを定義する場合は、宛先テーブルの列が JSON キーと一致する必要はありません。列の名前を希望の名前に変更し、必要に応じてデータ型をキャストできます。

カスタムパイプで動作するようにコネクタを調整するには、次のタスクを実行します。

  1. OpenflowキャンバスのKinesis取り込みフローで使用される PublishSnowpipeStreaming プロセッサーを右クリックします。

  2. コンテキストメニューから Configure を選択します。

  3. Properties タブに移動します。

  4. 宛先タイプフィールドで、Pipe を選択します。

  5. パイプフィールドに、パイプの名前を入力します。

  6. Apply を選択して構成を保存します。

エラー処理のカスタマイズ

エラー処理は、Snowpipe Streamingサービス内のOpenflow側の失敗とサーバー側の失敗に分割されます。

  • Openflowエラー(クライアント側の失敗):記録がSnowflakeに到達する前に、解析不可のペイロードやカスタム変換の失敗などのエラーが発生します。デフォルトでは、これらの記録は破棄されます。Openflowでこれらのエラーを処理することが可能です。そのためには、ConsumeKinesis プロセッサーで解析失敗関係から FlowFiles を使用します。

  • Snowpipe Streamingエラー(サーバー側の失敗):Snowflakeには正常に到達したものの、宛先テーブルのスキーマと互換性のない(例: 型の不一致)記録のエラーは、Snowflakeインフラストラクチャによってキャプチャされます。宛先テーブルでエラーのロギングが有効になっている(error_logging = true)場合、これらの失敗した行は宛先のエラーテーブルに自動的に取り込まれます。

次のステップ