Openflow Connector for Kafkaを設定する¶
注釈
このコネクタは、 Snowflakeコネクタ規約 に従うものとします。
前提条件¶
Kafka用Snowflake Openflowコネクタ を確認してください。
:doc:`/user-guide/data-integration/openflow/setup-openflow-byoc`または:doc:`Openflow のセットアップ - Snowflake デプロイメント</user-guide/data-integration/openflow/setup-openflow-spcs>`があることを確認してください。
Openflow - Snowflakeデプロイを使用する場合は、必要なドメインの構成 を確認して、Kafkaコネクタに必要なドメインへのアクセス権を付与していることを確認してください。コネクタは、クラスタ内のすべてのKafkaブローカーに接続できる必要があります。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
新しいロールを作成するか、既存のロールを使用して:ref:
データベース権限 <label-database_privileges>を付与します。コネクタでは、ユーザーが宛先テーブルを作成する必要があります。ユーザーにSnowflakeオブジェクトの管理に必要な権限が付与されていることを確認します。
オブジェクト
権限
メモ
データベース
USAGE
スキーマ
USAGE
テーブル
OWNERSHIP
コネクタがデータをテーブルに取り込むために必要です。
Snowflakeは、アクセス制御を改善するために、各Kafkaクラスターに個別のユーザーとロールを作成することをお勧めします。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
注釈
権限はコネクタロールに直接付与する必要があり、継承することはできません。
宛先テーブルの構成
Snowflakeは、スキーマの変更にサーバー側のスキーマ進化を使用し、DML エラーのロギングにエラーテーブル を使用することを強くお勧めします。次の例は、テーブルを作成し、適切な OWNERSHIP 権限を追加する方法を示しています。
このコネクタは、スキーマの自動検出と進化をサポートしています。Snowflakeのテーブルの構造は、コネクタによってロードされた新しいデータの構造をサポートするために、自動的に定義され、進化します。記録コンテンツの第1レベルのキーを、名前で一致するテーブル列に自動的にマッピングします(大文字と小文字の区別なし)。
スキーマ進化を有効にすると、Snowflakeは受信ストリームで検出された新しい列を追加し、NOT NULL 制約をドロップすることで、宛先テーブルを自動的に拡張して、新しいデータパターンに対応することができます。詳細については、 テーブルスキーマの進化 をご参照ください。
ENABLE_SCHEMA_EVOLUTION が有効になっていない場合は、テーブル定義を拡張してスキーマを手動で作成する必要があります。コネクタは、記録コンテンツの第1レベルのキーを名前でテーブル列と一致させようとします。JSON のキーがテーブル列と一致しない場合、コネクタはキーを無視します。
(オプション)シークレットマネージャーの構成
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
シークレットマネージャーを構成した後に、構成したマネージャーに対してどのように認証するかを決定します。AWS では、SnowflakeはOpenflowに関連付けられている EC2 インスタンスロールを使用して、他のシークレットを永続化する必要がないようにすることをお勧めします。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。コントローラー設定 > パラメータープロバイダーに移動し、パラメーター値を取得します。
関連付けられているパラメーターパスですべての認証情報を参照するため、Openflow内で機密性の高い値を永続化する必要はありません。
ユーザーへのアクセス権の付与
コネクタによって取り込まれた未加工のデータへのアクセスを必要とする(たとえば、Snowflakeでのカスタム処理のため)他のSnowflakeユーザーについては、ステップ1で作成したロールをそれらのユーザーに付与します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
コネクタをインストールするには、次を実行します。
Openflow概要ページに移動します。特集コネクタセクション で、その他のコネクタを表示 を選択します。
Openflowのコネクタページでコネクタを探し、**ランタイムに追加**を選択します。
ランタイムを選択ダイアログで、**利用可能なランタイム**ドロップダウンリストからランタイムを選択し、**追加**を選択します。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベース、スキーマおよびテーブルをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイに対する認証を行い、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたら、許可する を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
必要に応じて、組み込みパラメーターを構成する前に、コネクタ構成をカスタマイズします。
プロセスグループパラメーターを入力する
インポートしたプロセスグループを右クリックし、パラメーターを選択します。
必要なパラメーター値を入力する
パラメーター¶
次のテーブルは、Openflow Connector for Kafkaのパラメーターを示しています。
パラメーター |
説明 |
必須 |
|---|---|---|
Kafka自動オフセットリセット |
Kafkaの 設定可能な値: 最も古い: オフセットを以前のオフセットに自動的にリセット、最新: オフセットを最新のオフセットに自動的にリセット、なし: コンシューマーグループに対して以前のオフセットが見つからない場合に、コンシューマーに例外をスロー。 デフォルト: 最新 |
有り |
Kafkaブートストラップサーバー |
Kafkaブートストラップサーバーのコンマ区切りリストには、ポート(例: |
有り |
Kafkaコンシューマーグループ ID |
コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。 |
有り |
Kafka SASL パスワード |
SASL512 SCRAM メカニズムを使用するときに、構成されたパスワードによって指定されるパスワード |
|
Kafka SASL ユーザー名 |
SASL512 SCRAM メカニズムを使用するときに、構成されたパスワードによって指定されるユーザー名 |
|
Kafkaトピック形式 |
次のいずれか: 名前/パターン。指定される「Kafkaトピック」が、名前のコンマ区切りリストまたは単一の正規表現であるかどうかを指定します。 |
有り |
Kafkaトピック |
Kafkaトピックのコンマ区切りリスト、または正規表現。 |
有り |
Snowflake宛先データベース |
データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
Snowflake宛先スキーマ |
データが永続化されるスキーマ。Snowflakeにすでに存在している必要があります。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
|
有り |
Snowflake宛先テーブル |
データが永続化されるテーブル。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 |
有り |
コネクタを起動します。¶
プレーンを右クリックし、 Enable all Controller Services を選択します。
プレーンを右クリックし、 Start を選択します。コネクタがデータの取り込みを開始します。
KAFKAMETADATA 列についての理解¶
コネクタは、Kafka記録に関するメタデータを含む KAFKAMETADATA 構造を入力します。構造には次の情報が含まれています。
フィールド |
データ型 |
説明 |
|---|---|---|
topic |
String |
レコードが由来するKafkaトピックの名前です。 |
partition |
number |
トピック内のパーティションの番号です。(これはSnowflakeマイクロパーティションではなく、Kafkaパーティションであることに注意してください。) |
offset |
number |
そのパーティションのオフセットです。 |
timestamp |
number |
記録がKafkaに追加された時点のタイムスタンプ。 |
key |
String |
メッセージがKafka KeyedMessageの場合、これはそのメッセージのキーです。コネクタが RECORD_METADATA にキーを保存するためには、Kafka構成の |
headers |
オブジェクト |
ヘッダーは、レコードに関連付けられたユーザー定義のキーと値のペアです。各レコードには、0、1、または複数のヘッダーを含めることができます。 |
取り込みのレイテンシの測定¶
行の変更時間に基づいて変更の追跡、増分処理、およびTime Travelクエリを実行するには、ROW_TIMESTAMP 機能を使用します。
宛先テーブルで次のコマンドを実行して有効にしてください。
行のタイムスタンプが有効になると、テーブルは METADATA$ROW_LAST_COMMIT_TIME 列を公開します。これは、各行が最後に変更された時点のタイムスタンプを返します。
詳細については、 METADATA$ROW_LAST_COMMIT_TIME をご参照ください。
注釈
行のタイムスタンプはインタラクティブテーブルでは使用できません。詳細については、 Snowflakeインタラクティブテーブルとインタラクティブウェアハウス をご参照ください。
Apache Iceberg™テーブルでのコネクタの使用¶
コネクタは、Snowflakeが管理するApache Iceberg™テーブルにデータを取り込むことができますが、以下の要件を満たしている必要があります。
Apache Iceberg™テーブルに関連付けられた外部ボリュームに対する USAGE 権限を付与されている必要があります。
コネクタを実行する前にApache Iceberg™テーブルを作成する必要があります。
外部ボリュームの使用許可¶
たとえば、Icebergテーブルが kafka_external_volume 外部ボリュームを使用し、コネクタがロール openflow_kafka_connector_role を使用する場合、次のステートメントを実行します:
取り込み用のApache Iceberg™テーブルの作成¶
コネクタは自動的にIcebergテーブルを作成せず、スキーマの進化をサポートしません。コネクタを実行する前に、Icebergテーブルを手動で作成する必要があります。
Icebergテーブルを作成する場合、Icebergデータタイプ(VARIANTなど)または 互換性のあるSnowflakeデータタイプ を使用できます。
例えば、次のようなメッセージを考えてみましょう:
例のメッセージ用にIcebergテーブルを作成するには、以下のステートメントの1つを使用します:
インタラクティブテーブルでのコネクタの使用¶
インタラクティブテーブルは、低レイテンシの高同時実行性クエリ向けに最適化されたSnowflakeテーブルの特別なタイプです。詳細については、 Snowflakeインタラクティブテーブルとインタラクティブウェアハウス をご参照ください。
インタラクティブテーブルを作成します。
重要な考慮事項:
インタラクティブテーブルには特定の制限とクエリ制限があります。コネクタで使用する前に Snowflakeインタラクティブテーブルとインタラクティブウェアハウス を確認してください。
インタラクティブテーブルの場合、必要な変換はテーブル定義で処理する必要があります。
インタラクティブテーブルを効率的にクエリするには、インタラクティブウェアハウスが必要です。
宛先テーブルに対する顧客定義スキーマでコネクタを使用¶
コネクタは、各Kafka記録をSnowflakeテーブルに挿入される行として扱います。たとえば、以下のJSONのように構造化されたメッセージのコンテンツを持つKafkaトピックがある場合:
デフォルトでは、ENABLE_SCHEMA_EVOLUTION = TRUE 機能のメリットにより、JSON からすべてのフィールドを指定する必要はありません。ただし、静的スキーマを優先的に使用する場合は、次の処理を実行して作成できます。
顧客定義の PIPE でのコネクタの使用¶
独自のパイプを作成することを選択した場合は、パイプの COPY INTO ステートメントでデータ変換ロジックを定義できます。必要に応じて列の名前を変更しデータ型をキャストすることができます。例:
独自のパイプを定義する場合は、宛先テーブルの列が JSON キーと一致する必要はありません。列の名前を希望の名前に変更し、必要に応じてデータ型をキャストできます。
カスタムパイプで動作するようにコネクタを調整するには、次のタスクを実行します。
OpenflowキャンバスのKafka取り込みフローで使用される PublishSnowpipeStreaming プロセッサーを右クリックします。
コンテキストメニューから[構成する]を選択します。
プロパティタブに移動します。
宛先タイプフィールドで、[パイプ]を選択します。
パイプフィールドに、PIPE の名前を入力します。
[適用]を選択し、構成を保存します。
エラー処理のカスタマイズ¶
エラー処理は、Snowpipe Streamingサービス内のOpenflow側の失敗とサーバー側の失敗に分割されます。
Openflowエラー(クライアント側の失敗):記録がSnowflakeに到達する前に、解析不可のペイロードやカスタム変換の失敗などのエラーが発生します。デフォルトでは、これらの記録は破棄されます。Openflowでこれらのエラーを処理することが可能です。そのためには、ConsumeKafka プロセッサーで解析失敗関係から FlowFiles を使用します。
Snowpipe Streamingエラー(サーバー側の失敗):Snowflakeには正常に到達したものの、宛先テーブルのスキーマと互換性のない(例: 型の不一致)記録のエラーは、Snowflakeインフラストラクチャによってキャプチャされます。宛先テーブルでエラーのロギングが有効になっている(
error_logging = true)場合、これらの失敗した行は宛先のエラーテーブルに自動的に取り込まれます。