Set up Openflow Connector for Kinesis for JSON data format¶
注釈
This connector is subject to the Snowflake Connector Terms.
This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.
The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.
前提条件¶
Openflow Connector for Kinesis について を確認してください。
Ensure that you have Openflowの設定 - BYOC or Set up Openflow - Snowflake Deployments.
|OFSFSPCS-plural|を使用する場合、:doc:`必要なドメインの構成</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`を精査し、:ref:`label-openflow_domains_used_by_openflow_connectors_kinesis`コネクタに必要なドメインへのアクセス許可を付与していることを確認してください。
注釈
If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.
Kinesisストリームを設定する¶
AWS 管理者として、 AWS アカウントで以下のアクションを実行します。
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
データを格納する宛先テーブルを作成するために使用する宛先データベースと宛先スキーマを作成します。
宛先テーブルがまだ存在しない場合に自動的に作成するコネクタの機能を使用する予定の場合は、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。
オブジェクト
権限
注意
データベース
USAGE
スキーマ
USAGE . CREATE TABLE .
スキーマレベルのオブジェクトが作成された後に、 CREATE
object権限を取り消すことができます。テーブル
OWNERSHIP
Kinesisコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。. コネクタがKinesisストリームからの記録用に新しいターゲットテーブルを作成する場合、構成で指定されたユーザーの既定のロールがテーブル所有者になります。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
ステップ3のSnowflake SERVICE ユーザー向けの キーペア認証 で構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
Populate the required parameter values as described in Parameters section below.
Parameters¶
This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.
The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module's parameter context.
Snowflake destination parameters¶
パラメーター |
説明 |
必須 |
|---|---|---|
宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
宛先スキーマ |
データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
|
有り |
Iceberg Enabled |
Whether Iceberg is enabled for table operations. One of |
有り |
Schema Evolution Enabled |
Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables.
Note that schema evolution can also be controlled at the individual table level through table-specific parameters.
One of: |
有り |
Schema Evolution For New Tables Enabled |
Controls whether schema evolution is enabled when creating new tables. When set to 'true', new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter.
When set to 'false', new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter.
Not applicable to Iceberg tables as they are not being created automatically.
This setting only affects table creation, not existing tables. One of: |
有り |
Snowflakeアカウント識別子 |
以下を使用する場合:
|
有り |
Snowflake認証ストラテジー |
以下を使用する場合:
|
有り |
Snowflake秘密キー |
以下を使用する場合:
|
無し |
Snowflake秘密キーファイル |
以下を使用する場合:
|
無し |
Snowflake秘密キーパスワード |
以下を使用する場合
|
無し |
Snowflakeロール |
以下を使用する場合
|
有り |
Snowflakeのユーザー名 |
以下を使用する場合
|
有り |
Kinesis JSON Source Parameters¶
パラメーター |
説明 |
必須 |
|---|---|---|
AWS リージョンコード |
|
有り |
AWS アクセスキー ID |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
有り |
AWS シークレットアクセスキー |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
有り |
Kinesisアプリケーション名 |
アプリケーションのKinesisストリーム消費の進捗を追跡するための DynamoDB テーブル名に使用される名前。 |
有り |
Kinesisストリーム初期位置 |
データの複製を開始するストリームの初期位置。
|
有り |
Kinesisストリーム名 |
データを消費する AWS Kinesisストリーム名。 |
有り |
Metrics Publishing |
Specifies where Kinesis Client Library metrics are published to. Possible values: |
有り |
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
Right-click on the connector's process group and select Start.
コネクタがデータの取り込みを開始します。
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Below is an example of a Snowflake table loaded by the connector:
行 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ ... KINESISMETADATA object ... } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ ... KINESISMETADATA object ... } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ ... KINESISMETADATA object ... } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ ... KINESISMETADATA object ... } |
The KINESISMETADATA column contains an object with the following fields:
Field Name |
Field Type |
Example Value |
説明 |
|---|---|---|---|
|
String |
|
The name of the Kinesis stream the record came from. |
|
String |
|
The identifier of the shard in the stream the record came from. |
|
String |
|
The approximate time that the record was inserted into the stream (ISO 8601 format). |
|
String |
|
The partition key specified by the data producer for the record. |
|
String |
|
The unique sequence number assigned by Kinesis Data Streams to the record in the shard. |
|
Number |
|
The subsequence number for the record (used for aggregated records with the same sequence number). |
|
String |
|
A combination of the sequence number and the subsequence number for the record. |
スキーマの進化¶
This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.
Snowflake detects the schema of the incoming data and loads data into tables
that match any user-defined schema. Snowflake also allows adding
new columns or dropping the NOT NULL constraint from columns missing in new incoming records.
Schema detection with the connector infers data types based on the JSON data provided.
If the connector creates the target table, schema evolution is enabled by default.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see テーブルスキーマの進化.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
要件と制限¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
コネクタを実行する前にIcebergテーブルを作成する必要があります。
ユーザーが作成したテーブルにデータを挿入するアクセス権を持っていることを確認します。
構成とセットアップ¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Icebergテーブルへの取り込みを有効にする¶
Icebergテーブルへの取り込みを有効にするには、 Iceberg Enabled パラメーターを true に設定する必要があります。
インジェスト用にIcebergテーブルを作成¶
Before you run the connector, you must create an Iceberg table.
The initial table schema depends on your connector Schema Evolution Enabled property settings.
With enabled schema evolution, you must create a table with a column named kinesisMetadata.
The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn't supported. Instead, use a structured OBJECT or MAP.
例えば、次のようなメッセージを考えてみましょう:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
The following statement creates a table with all fields the Kinesis message contains:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
注釈
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.