Set up Openflow Connector for Kinesis for JSON data format

注釈

This connector is subject to the Snowflake Connector Terms.

This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.

The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.

前提条件

  1. Openflow Connector for Kinesis について を確認してください。

  2. Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.

  3. |OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_kinesis`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。

注釈

If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.

Kinesisストリームを設定する

AWS 管理者として、 AWS アカウントで以下のアクションを実行します。

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

  2. データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。

    1. 宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。

      オブジェクト

      権限

      注意

      データベース

      USAGE

      スキーマ

      USAGE . CREATE TABLE .

      スキーマレベルのオブジェクトが作成された後に、 CREATE object 権限を取り消すことができます。

      テーブル

      OWNERSHIP

      Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。. コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。

      以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  4. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。

  6. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  4. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  5. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

  2. Populate the required parameter values as described in Parameters section below.

Parameters

This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.

The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module's parameter context.

Snowflake destination parameters

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

宛先スキーマ

データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

次の例をご参照ください。

  • CREATE SCHEMA SCHEMA_NAME または CREATE SCHEMA schema_name: SCHEMA_NAME を使用します

  • CREATE SCHEMA "schema_name" または CREATE SCHEMA "SCHEMA_NAME": それぞれ schema_name または SCHEMA_NAME を使用します

有り

Iceberg Enabled

Whether Iceberg is enabled for table operations. One of true / false.

有り

Schema Evolution Enabled

Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables. Note that schema evolution can also be controlled at the individual table level through table-specific parameters. One of: true / false.

有り

Schema Evolution For New Tables Enabled

Controls whether schema evolution is enabled when creating new tables. When set to 'true', new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter. When set to 'false', new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter. Not applicable to Iceberg tables as they are not being created automatically. This setting only affects table creation, not existing tables. One of: true / false.

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment または BYOC:SNOWFLAKE_SESSION_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。BYOC デプロイメントでは、SNOWFLAKE_SESSION_TOKENを使用するために、事前に:ref:ランタイムロール <label-deployment_byoc_setup_runtime_role> が構成されている必要があります。

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Kinesis JSON Source Parameters

パラメーター

説明

必須

AWS リージョンコード

us-west-2 など、Kinesis Stream がある AWS リージョン。

有り

AWS アクセスキー ID

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

有り

AWS シークレットアクセスキー

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

有り

Kinesisアプリケーション名

アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。

有り

Kinesisストリーム初期位置

データの複製を開始するストリームの初期位置。

可能な値は次のとおりです。
  • LATEST: 最新の格納された記録

  • TRIM_HORIZON: 最も古い格納された記録

有り

Kinesisストリーム名

データを消費する AWS Kinesisストリーム名。

有り

Metrics Publishing

Specifies where Kinesis Client Library metrics are published to. Possible values: DISABLED, LOGS, CLOUDWATCH.

有り

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services を選択します。

  2. Right-click on the connector's process group and select Start.

コネクタがデータの取り込みを開始します。

Table Schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Below is an example of a Snowflake table loaded by the connector:

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ ... KINESISMETADATA object ... }

2

XYZ789

ZABZX

SELL

3024

{ ... KINESISMETADATA object ... }

3

XYZ789

ZTEST

SELL

799

{ ... KINESISMETADATA object ... }

4

ABC123

ZABZX

BUY

2033

{ ... KINESISMETADATA object ... }

The KINESISMETADATA column contains an object with the following fields:

Field Name

Field Type

Example Value

説明

stream

String

stream-name

The name of the Kinesis stream the record came from.

shardId

String

shardId-000000000001

The identifier of the shard in the stream the record came from.

approximateArrival

String

2025-11-05T09:12:15.300

The approximate time that the record was inserted into the stream (ISO 8601 format).

partitionKey

String

key-1234

The partition key specified by the data producer for the record.

sequenceNumber

String

123456789

The unique sequence number assigned by Kinesis Data Streams to the record in the shard.

subSequenceNumber

Number

2

The subsequence number for the record (used for aggregated records with the same sequence number).

shardedSequenceNumber

String

12345678900002

A combination of the sequence number and the subsequence number for the record.

スキーマの進化

This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.

Snowflake detects the schema of the incoming data and loads data into tables that match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL constraint from columns missing in new incoming records.

Schema detection with the connector infers data types based on the JSON data provided.

If the connector creates the target table, schema evolution is enabled by default.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see テーブルスキーマの進化.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

要件と制限

Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • コネクタを実行する前にIcebergテーブルを作成する必要があります。

  • ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。

構成とセットアップ

To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Icebergテーブルへの取り込みを有効にする

Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。

インジェスト用にIcebergテーブルを作成

Before you run the connector, you must create an Iceberg table. The initial table schema depends on your connector Schema Evolution Enabled property settings.

With enabled schema evolution, you must create a table with a column named kinesisMetadata. The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn't supported. Instead, use a structured OBJECT or MAP.

例えば、次のようなメッセージを考えてみましょう:

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

The following statement creates a table with all fields the Kinesis message contains:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

注釈

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.