Snowflake Connector for Kafka:インストールと構成

このトピックでは、 Snowflake Connector for Kafka をインストールして構成する手順について説明します。

Kafkaコネクタのインストール

Kafkaコネクタは JAR (Java実行可能ファイル)として提供されます。

Snowflakeは、次の2つのバージョンのコネクタを提供します。

このトピックの手順では、どちらかのバージョンのコネクタにのみ適用される手順を指定します。

インストールの前提条件

  • Kafkaコネクタは、次のパッケージバージョンをサポートしています。

    パッケージ

    Snowflake Kafkaコネクタバージョン

    パッケージのサポート(Snowflakeによりテスト済み)

    Apache Kafka

    2.0.0以降

    Apache Kafka 2.8.2、3.7.2、4.1.1

    Confluent

    2.0.0以降

    Confluent 6.2.15、7.8.2、8.2.0

  • Kafkaコネクタは、Kafka Connect API 3.9.0で使用するために構築されています。Kafka Connect API のそれ以降のバージョンはテストされていません。3.9.0以前のバージョンはコネクタと互換性があります。詳細については、 Kafkaの互換性 をご参照ください。

  • Kafkaコネクタと JDBC ドライバーのjarファイルの両方が環境にある場合、 JDBC バージョンが、目的のKafkaコネクタバージョンの pom.xml ファイルで指定された snowflake-jdbc バージョンと一致していることを確認します。任意のKafkaコネクタのリリースバージョン(例: v4.0.0)に移動できます。次に pom.xml ファイルをブラウズして snowflake-jdbc のバージョンを調べます。

  • データのインジェストにAvro形式を使用している場合:

    • https://mvnrepository.com/artifact/org.apache.avroから入手可能なAvroパーサー、バージョン1.8.2(またはそれ以上)を使用します。

    • Avroでスキーマレジストリ機能を使用する場合は、https://mvnrepository.com/artifact/io.confluentで入手可能なバージョン5.0.0(またはそれ以上)のKafka Connect Avro Converterを使用してください。

      スキーマレジストリ機能は OSS Apache Kafkaパッケージでは使用できないことに注意してください。

  • 希望のデータ保持時間やストレージ制限でKafkaを構成します。

  • Kafka Connectクラスターをインストールして構成します。

    各Kafka Connectクラスタノードには、Kafkaコネクタに十分な RAM を含める必要があります。推奨される最小量は、Kafkaパーティションごとに5 MB です。これは、Kafka Connectが行っている他の作業に必要な RAM に追加する分です。

    重要

    v4コネクタは、バッファー用にオフヒープ(システム)メモリを割り当てるRustベースのSnowpipe Streaming SDKを使用します。JVMのヒープサイズは、SDK用の残余分を確保するために使用可能なメモリの約50%に制限します。たとえば、8GBのRAMを搭載するワーカーの場合は、``-Xmx4g``を設定します。

  • Snowflakeは、Kafka BrokerとKafka Connect Runtimeで同じバージョンを使用することをお勧めしています。

  • Snowflakeは、Snowflakeアカウントと同じクラウドプロバイダー リージョン でKafka Connectインスタンスを実行することを強くお勧めしています。これは厳密な要件ではありませんが、通常はスループットが向上します。

Snowflakeクライアントでサポートされているオペレーティングシステムのリストについては、 オペレーティングシステムのサポート をご参照ください。

コネクタのインストール

このセクションでは、Confluent用のKafkaコネクタのインストールと構成の手順を説明します。次のテーブルは、サポートされているコネクタのバージョンを示しています。

リリースシリーズ

ステータス

メモ

4.x.x

一般公開

最新バージョン。3.xおよび2.xからの移行は手動で行う必要があります。

3.x.x

正式にサポート

v4へのアップグレードをお勧めします。

2.x.x

正式にサポート

アップグレードをお勧めします。

1.x.x

サポート対象外

Confluent用のコネクタのインストール

Kafkaコネクタファイルのダウンロード

次のいずれかの場所からKafkaコネクタ JAR ファイルをダウンロードします。

Confluentハブ:

https://www.confluent.io/hub/

このパッケージには、キーペア認証に暗号化または非暗号化秘密キーを使用するために必要なすべての依存関係が含まれています。詳細については、このトピックの後半にある キーペア認証とキーローテーションの使用 をご参照ください。

Maven Central Repository:

https://mvnrepository.com/artifact/com.snowflake

このバージョンを使用する場合は、`Bouncy Castle <https://www.bouncycastle.org/>`_暗号化ライブラリ(JARファイル)をダウンロードする必要があります。

これらのファイルをKafkaコネクタ JAR ファイルと同じローカルフォルダーにダウンロードします。

コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。

Kafkaコネクタのインストール

他のコネクタをインストールするために提供されている手順を使用して、Kafkaコネクタをインストールします。

オープンソースApache Kafka用のコネクタのインストール

このセクションでは、オープンソースのApache Kafka用にKafkaコネクタをインストールおよび構成する手順について説明します。

Apache Kafkaのインストール

  1. Kafka公式ウェブサイト からKafkaパッケージをダウンロードします。

  2. ターミナルウィンドウで、パッケージファイルをダウンロードしたディレクトリに移動します。

  3. 次のコマンドを実行して、:file:`kafka_<scala_version>-<kafka_version>.tgz`ファイルを解凍します。

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    

JDK のインストール

Java Development Kit(JDK)バージョン11以降をインストールして構成します。Snowflakeは、 SEのStandard Edition(JDK)でテストします。Enterprise Edition(EE)は互換性があると予想されますが、テストされていません。

以前に JDK をインストールしている場合 、このセクションをスキップできます。

  1. `Oracle JDK ウェブサイト<https://www.oracle.com/technetwork/java/javase/downloads/index.html>`_ から JDK をダウンロードします。

  2. JDKをインストールまたは解凍します。

  3. ご使用のオペレーティングシステムの手順に従って、環境変数 JAVA_HOME が JDKを含むディレクトリを指すように設定します。

Kafka Connector JAR ファイルをダウンロードします

  1. Maven Central RepositoryからKafkaコネクタ JAR ファイルをダウンロードします。

    https://mvnrepository.com/artifact/com.snowflake

  2. `Bouncy Castle<https://www.bouncycastle.org/>`_ 暗号化ライブラリjarファイルをダウンロードします。

  3. Kafkaデータが Apache Avro 形式でストリーミングされている場合、Avro JAR ファイル(1.11.4)をダウンロードします。

コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。

Kafkaコネクタのインストール

オープンソースApache Kafka用のコネクタのインストール でダウンロードした JAR ファイルを <kafka_dir>/libs フォルダーにコピーします。

Kafkaコネクタの構成

スタンドアロンモードでデプロイされる場合、コネクタは、Snowflakeログイン認証情報、トピック名、Snowflakeテーブル名などのパラメーターを指定するファイルを作成して構成されます。分散モードでデプロイすると、コネクタはKafka接続クラスターの RESTAPI エンドポイントを呼び出すことで構成されます。

重要

Kafka Connectフレームワークは、Kafkaコネクタの構成設定をプライマリノードからワーカーノードにブロードキャストします。構成設定には、機密情報(具体的にはSnowflakeユーザー名とプライベートキー)が含まれます。Kafka Connectノード間の通信チャネルを必ず保護してください。詳細については、Apache Kafkaソフトウェア用ドキュメントをご参照ください。

各設定は、1つのデータベースとそのデータベース内の1つのスキーマのトピックと対応するテーブルを指定します。1つのコネクタは任意の数のトピックからメッセージを取り込むことができますが、対応するテーブルはすべて単一のデータベースとスキーマに保存される必要があります。

このセクションでは、分散モードとスタンドアロンモードの両方について説明します。

構成フィールドの説明については、 コネクタ構成プロパティ をご参照ください。

重要

通常、構成ファイルには秘密キーなどのセキュリティ関連情報が含まれているため、ファイルに読み取り/書き込み権限を適切に設定してアクセスを制限してください。

さらに、構成ファイルを安全な外部の場所またはキー管理サービスに保存することを検討してください。詳細については、このトピックの 秘密の外部化 をご参照ください。

分散モード

Kafka構成ファイル(例: <path>/<config_file>.json)を作成します。すべてのコネクタ構成情報をファイルに入力します。ファイルは JSON 形式にする必要があります。

サンプル構成ファイル

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "none",
      "snowflake.streaming.validate.compatibility.with.classic": "false"
      }
}

スタンドアロンモード

設定ファイルを作成します。例: :file:`<kafka_dir>/config/SF_connect.properties`すべてのコネクタ構成情報をファイルに入力します。

サンプル構成ファイル

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
snowflake.streaming.validate.compatibility.with.classic=false

テストとプロトタイピングのためのキャッシュに関する考慮事項

コネクタはテーブルとパイプの存在チェックをキャッシュし、パーティションの再バランス調整中のパフォーマンスを向上させます。ただし、テストおよびプロトタイピング中は、このキャッシュ動作により、コネクタが手動で作成されたテーブルまたはパイプをすぐに検出しない可能性があります。

問題: コネクタの実行中にテーブルやパイプを手動で作成した場合、コネクタはデフォルトで最大5分間、キャッシュされた存在チェック結果(オブジェクトが存在しないことを示すこともあります)を使用し続ける可能性があります。これにより、テスト中に予期しないエラーや動作が発生する可能性があります。

テストに関する推奨: テスト中やプロトタイピング中にキャッシュ関連の問題を回避するには、キャッシュを無効にします。

snowflake.cache.table.exists=false
snowflake.cache.pipe.exists=false

この構成により、パーティションの再バランスごとにコネクタが最新の存在チェックを実行でき、手動で作成されたテーブルやパイプの効果をすぐに確認できるようになります。

重要

これらの最小限のキャッシュ設定は、 テストとプロトタイプ作成のみ に推奨されます。実稼働環境では、デフォルトのキャッシュ有効期限値(5分以上)を使用して、Snowflakeへのメタデータクエリを最小限に抑え、特に多数のパーティションを処理する場合に再バランスのパフォーマンスを向上させます。

コネクタ構成プロパティ

新規インストールのための最小構成

以下に示すのは、コネクタをv4のデフォルトで実行するための最小限の構成です。この例では、分散モードでJSON形式を使用しています。

{
  "name": "my_kafka_connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
    "topics": "my_topic",
    "snowflake.url.name": "https://myaccount.snowflakecomputing.com",
    "snowflake.user.name": "my_user",
    "snowflake.private.key": "<base64-encoded-private-key>",
    "snowflake.database.name": "MY_DB",
    "snowflake.schema.name": "MY_SCHEMA",
    "snowflake.role.name": "MY_ROLE",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "snowflake.streaming.validate.compatibility.with.classic": "false"
  }
}

この構成は、サーバー側の検証、スキーマ化された列、大文字と小文字を区別する識別子という、v4のデフォルトをすべて使用します。コネクタは、必要に応じてテーブルとパイプを自動作成します。

注釈

新規インストールの場合は``snowflake.streaming.validate.compatibility.with.classic``を``false``に設定します。この設定は、v3から移行する場合にのみ必要です。

必須のプロパティ

name

アプリケーション名。これは、顧客が使用するすべてのKafkaコネクタで一意でなければなりません。この名前は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効な識別子については、 識別子の要件 をご参照ください。

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

トピックのコンマ区切りリスト。デフォルトでは、Snowflakeはテーブル名がトピック名と同じであると想定します。テーブル名がトピック名と同じでない場合は、オプションの topic2table.map パラメーター(下記)を使用して、トピック名からテーブル名へのマッピングを指定します。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。

注釈

両方ではなく、 topics または topics.regexいずれか が必要です。

topics.regex

これは、Snowflakeテーブルに読み込むメッセージを含むトピックを指定する正規表現(「regex」)です。コネクタは、正規表現に一致するトピック名からデータをロードします。正規表現は、Java正規表現のルールに従う必要があります(つまり、java.util.regex.Patternと互換性があります)。構成ファイルには、両方ではなく、 :emph:` または ` の topics``いずれか``topics.regex を含める必要があります。

snowflake.url.name

Snowflakeアカウントにアクセスするための URL。この URL には、使用する アカウント識別子 が含まれている必要があります。プロトコル(https://)とポート番号はオプションです。

snowflake.user.name

Snowflakeアカウントのユーザーログイン名。

snowflake.role.name

コネクタがテーブルにデータを挿入するために使用するロールの名前。

snowflake.private.key

ユーザーを認証するための秘密キー。ヘッダーまたはフッターではなく、キーのみを含めます。キーが複数の行に分割されている場合、改行を削除します。暗号化されていないキーを提供するか、暗号化されたキーを提供して snowflake.private.key.passphrase パラメーターを提供し、Snowflakeがキーを復号化できるようにします。:emph:` パラメーター値が暗号化されている 時に限り``snowflake.private.key` 、このパラメーターを使用します。これにより、 キーペア認証とキーペアローテーション の指示に従って暗号化された秘密キーが復号化されます。

注釈

オプションのプロパティsnowflake.private.key.passphrase もご参照ください。

snowflake.database.name

行を挿入するテーブルを含むデータベースの名前。

snowflake.schema.name

行を挿入するテーブルを含むスキーマの名前。

header.converter

レコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要です。値は "org.apache.kafka.connect.storage.StringConverter" です。

key.converter

Kafkaの記録のキーコンバーター(例: "org.apache.kafka.connect.storage.StringConverter")。これはKafkaコネクタでは使用されませんが、Kafka Connectプラットフォームでは必須です。

value.converter

コネクタは、標準のKafkaコミュニティコンバーターをサポートしています。データ形式に基づいて適切なコンバーターを選択します。

  • JSON 記録の場合: "org.apache.kafka.connect.json.JsonConverter"

  • スキーマレジストリを使用するAvro記録の場合: "io.confluent.connect.avro.AvroConverter"

注釈

``snowflake.enable.schematization=true``(デフォルト)の場合は、``StringConverter``および``ByteArrayConverter``が値コンバーターとしてサポートされていません。詳細については、 Snowflake Connector for Kafka のトラブルシューティング をご参照ください。

オプションのプロパティ

スキーマ化と検証プロパティ

これらのプロパティは、コネクタがデータを処理する方法と検証する方法を制御します。新規インストールの場合は、デフォルトが適切に機能します。v3から移行する場合は、使用する値のガイダンスについて:doc:`migrate-v3-to-v4`を確認してください。

snowflake.enable.schematization

受信レコードを個別のテーブル列にスキーマ化するか、レガシーVARIANT列にラップするかを制御します。

true``(デフォルト)の場合は、記録フィールドが名前別に個別のテーブル列にマッピングされます。``false``の場合、コネクタは記録を2つのVARIANT列(``RECORD_CONTENT``および``RECORD_METADATA)に保存します。これはv3の動作と一致します。

デフォルト:

true

snowflake.validation

データ検証とスキーマ進化を実行する場所を制御します。

``server_side``(デフォルト):検証はCOPYおよびSnowpipeの動作と整合するようにSnowflakeバックエンドによって実行されます。無効な記録は:doc:`エラーテーブル</user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-error-handling>`にキャプチャされます。デフォルトのパイプモードとユーザー定義のパイプモードの両方をサポートします。

client_side:コネクタは、Snowflakeに行を送信する前にデータ型とスキーマの互換性を検証します。無効なレコードについて配信不能キュー(DLQ)をサポートします。デフォルトのパイプモードでのみ動作します。

詳細については、 Validation and error handling をご参照ください。

デフォルト:

server_side

移行および互換性のプロパティ

これらのプロパティは、v3から移行する場合に関連します。新規インストールの場合は、これらをスキップして``snowflake.streaming.validate.compatibility.with.classic=false``を設定できます。

snowflake.compatibility.enable.autogenerated.table.name.sanitization

自動生成されたテーブル名がトピック名から派生される方法を制御します。

``false``(デフォルト)の場合、トピック名はテーブル名にそのまま使用され、大文字および小文字の区別と特殊文字が保持されます。テーブル名は引用符で囲まれた識別子として作成されます。

``true``の場合、無効なSnowflake識別子文字はアンダースコアに置き換えられ、名前は大文字で表記されて、一意性を確保するためにハッシュコードが追加されます。これはv3の動作と一致します。

デフォルト:

false

snowflake.compatibility.enable.column.identifier.normalization

列識別子の処理方法を制御します。

``false``(デフォルト)の場合、列識別子は大文字および小文字の区別と特殊文字をそのまま保持します。

``true``の場合、列識別子は大文字に正規化され、v3の動作と一致します。

デフォルト:

false

snowflake.streaming.validate.compatibility.with.classic

すべての移行関連の構成が明示的に設定されているかどうかを確認する起動検証を有効にします。``true``の場合は、以下の構成のいずれかが欠落しているかv3の動作との互換性がない場合に、コネクタは起動時に失敗して状態を説明するエラーが表示されます。

  • ``snowflake.validation``は``client_side``であることが必要です

  • ``snowflake.compatibility.enable.column.identifier.normalization``は``true``であることが必要です

  • ``snowflake.compatibility.enable.autogenerated.table.name.sanitization``は``true``であることが必要です

  • ``snowflake.enable.schematization``は明示的に``true``または``false``(デフォルトはv3とv4の間で変更されました)に設定されている必要があります

  • ``snowflake.streaming.classic.offset.migration``を明示的に設定する必要があります

  • ``snowflake.streaming.classic.offset.migration.include.connector.name``を明示的に設定する必要があります(オフセットの移行が``strict``または``best_effort``の場合)

これにより、互換性を損う変更を確認せずに、コピーされたv3構成でv4を意図せず実行することが防止されます。構成を確認した後にこのチェックをスキップするには、``false``に設定します。

注釈

新規インストール(v3からの移行なし)の場合は、これを``false``に設定します。互換性検証ツールは、既存のv3デプロイからアップグレードする場合にのみ必要です。

詳細については、 Migrate from Kafka connector v3 to v4 をご参照ください。

デフォルト:

true

オフセット移行プロパティ

これらのプロパティは、v4がv3 Snowpipe Streaming(SSv1)チャネルからコミットされたオフセットを移行する方法を制御します。``snowflake.ingestion.method=SNOWPIPE_STREAMING``を使用したv3コネクタから移行する場合にのみ関連します。v3 Snowpipeモード(ファイルベースのインジェスチョン)から移行する場合は、``snowflake.streaming.classic.offset.migration``を``skip``に設定します。

snowflake.streaming.classic.offset.migration

v4がv3 Snowpipe Streaming(SSv1)チャネルからオフセットを移行する方法を制御します。

strict: v4は、v3 SSv1チャネルからコミットされたオフセットを検索して、そのポイントから再開します。SSv1チャネルが見つからない場合、コネクタは失敗してエラーが表示されます。

best_effort: v4は、v3 SSv1チャネルからコミットされたオフセットを検索しようとします。チャネルが見つからない場合、v4はKafkaコンシューマーグループのオフセットにフォールバックします。

``skip``(デフォルト):SSv1オフセットの移行は実行されません。v4は、Kafkaコンシューマーグループのオフセットを使用します。v3 Snowpipeモード(Snowpipe Streamingではありません)から移行する場合は、これを使用します。

デフォルト:

skip

snowflake.streaming.classic.offset.migration.include.connector.name

SSv1チャネル名の検索にコネクタ名が含まれるかどうかを制御します。これは、v3コネクタの構成方法と一致する必要があります。v3では、``snowflake.streaming.channel.name.include.connector.name``プロパティがチャネル名にコネクタ名が含まれるようにするかどうかを制御していました。

v3コネクタに``snowflake.streaming.channel.name.include.connector.name=true``が設定されている場合、またはKafkaコネクタバージョン2.1.0または2.1.1(これらのバージョンにはデフォルトでコネクタ名が含まれていました)を実行している場合は``true``に設定します。それ以外の場合は``false``に設定します。

``snowflake.streaming.classic.offset.migration``が``strict``または``best_effort``である場合にのみ必須です。

デフォルト:

なし(オフセット移行がアクティブな場合は明示的に設定する必要があります)

エラー処理プロパティ

errors.tolerance

インジェスチョン中にコネクタがエラーにどのように応答するかを制御します。

``none``(デフォルト):コネクタタスクは最初のエラーで失敗します。サーバー側の検証では、エラー検出は非同期的であるため、破損した記録の後のいくつかの記録は、タスクが失敗する前に取り込まれる可能性があります。

all:コネクタはデータの取り込みを続行します。クライアント側の検証により、無効な記録はDLQ(構成されている場合)にルーティングされるか、警告が表示されることなくドロップされます。

警告

DLQトピックを構成せずに``errors.tolerance=all``を設定すると、クライアント側の検証を使用する際にトピックの無効な記録が警告が表示されることなくドロップされます。これにより、データが失われる可能性があります。

デフォルト:

none

errors.deadletterqueue.topic.name

配信不能キューのKafkaトピック名。``snowflake.validation=client_side``および``errors.tolerance=all``の場合にのみ有効です。

デフォルト:

空(DLQが無効)

errors.log.enable

``true``の場合、エラーは失敗した操作の詳細と記録プロパティとともにログに記録されます。

デフォルト:

false

enable.task.fail.on.authorization.errors

``true``の場合、Snowflakeからの認証エラーが発生した際にコネクタタスクは直ちに失敗します。``false``の場合、コネクタは再試行します。

デフォルト:

false

プロパティのキャッシュ保存

snowflake.cache.table.exists

テーブルの存在チェックのキャッシュ保存を有効にし、Snowflakeへのメタデータクエリの数を削減します。

デフォルト:

true

snowflake.cache.table.exists.expire.ms

テーブル存在チェックに関するキャッシュ有効期限(ミリ秒単位)。

デフォルト:

300000 (5分間)

snowflake.cache.pipe.exists

パイプの存在チェックに関するキャッシュ保存を有効にします。

デフォルト:

true

snowflake.cache.pipe.exists.expire.ms

パイプ存在チェックに関するキャッシュ有効期限(ミリ秒単位)。

デフォルト:

300000 (5分間)

モニタリングと診断のプロパティ

jmx

コネクタメトリックのJMX MBeansを有効にします。詳細については、 Snowflake Connector for Kafka をモニターする をご参照ください。

デフォルト:

true

enable.mdc.logging

MDC(Mapped Diagnostic Context)を有効にして、コネクタのコンテキストをログメッセージに追加します。これは、複数のコネクタインスタンスを実行する場合に活用できます。

デフォルト:

false

snowflake.streaming.metadata.connectorPushTime

``true``の場合、``RECORD_METADATA``に``SnowflakeConnectorPushTime``のタイムスタンプが含まれます。このフィールドは、コネクタが取り込みのために記録をバッファしたタイミングを記録し、エンドツーエンドのレイテンシを推定するのに役立ちます。

デフォルト:

true

高度なプロパティ

snowflake.streaming.client.provider.override.map

Snowpipe Streamingクライアントプロパティのオーバーライド。形式: key1:value1,key2:value2。Snowflakeサポートに相談した後にのみ使用してください。

デフォルト:

empty

その他のプロパティ

snowflake.private.key.passphrase

このパラメーターの値が空でない場合、コネクタはこのフレーズを使用して秘密キーの復号化を試みます。

tasks.max

タスクの数。通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じです。最適なパフォーマンスを実現するために、タスク数をKafkaパーティションの総数と等しく、CPU コアの数を超えないように設定することをお勧めします。タスク数が多いとメモリ消費量が増え、頻繁にリバランスが発生する可能性があります。

snowflake.topic2table.map

``topic:table``形式のトピックからテーブルへのマッピングのコンマ区切りリスト。トピック名の正規表現パターンをサポートします。正規表現はあいまいにはできず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。

トピック名とテーブル名は両方とも、二重引用符で囲んで、特殊文字(コロン、コンマ、スペース)をサポートできます。引用符で囲まれていないテーブル名は大文字になります。引用符で囲まれたテーブル名は大文字と小文字の区別を保持します。

正規表現パターン、多対一のマッピング、引用などの詳細な例については、:ref:`明示的なトピックからテーブルへのマッピング<label-kafkahp_explicit_topic_to_table_mapping>`をご参照ください。

例:

snowflake.topic2table.map=topic1:low_range,topic2:low_range,"my:topic":"My_Table"
value.converter.schema.registry.url

形式がAvroで、スキーマレジストリサービスを使用している場合、これはスキーマレジストリサービスの URL である必要があります。それ以外の場合、このフィールドは空でなければなりません。

value.converter.break.on.schema.registry.error

Schema Registry ServiceからAvroデータをロードする場合、このプロパティは、スキーマIDのフェッチ中にエラーが発生した場合にKafkaコネクタがレコードの消費を停止するかどうかを決定します。デフォルト値は false です。この動作を有効にするには、値を true に設定します。

jvm.proxy.host

Snowflake Kafka Connectorがプロキシサーバー経由でSnowflakeにアクセスできるようにするには、このパラメーターを設定して、そのプロキシサーバーのホストを指定します。

jvm.proxy.port

Snowflake Kafka Connectorがプロキシサーバーを介してSnowflakeにアクセスできるようにするには、このパラメーターを設定してそのプロキシサーバーのポートを指定します。

jvm.proxy.username

プロキシサーバーで認証するユーザー名。

jvm.proxy.password

プロキシサーバーで認証するユーザー名のパスワード。

snowflake.jdbc.map

例: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

その他の JDBC プロパティ(JDBC ドライバーの接続パラメーター参照 参照)は検証されません。これらの追加プロパティは検証されず、jvm.proxy.xxxsnowflake.user.namesnowflake.private.keysnowflake.schema.name、類似したプロパティなどの必須プロパティをオーバーライドする、またはそれらの代わりに使用されることがあってはなりません。

以下の組み合わせのいずれかを指定します:
  • tracing プロパティと JDBC_TRACE 環境変数

  • database プロパティと snowflake.database.name

曖昧な動作となり、動作は JDBC ドライバーによって決定されます。

value.converter.basic.auth.credentials.source

Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合、このパラメーターを文字列「USER_INFO」に設定し、以下で説明する value.converter.basic.auth.user.info パラメーターを設定します。それ以外の場合は、このパラメーターを省略します。

value.converter.basic.auth.user.info

Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合は、上記のとおりこのパラメーターを文字列「<user_ID>:<password>」に設定し、value.converter.basic.auth.credentials.sourceパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。

snowflake.metadata.createtime

値が FALSE に設定されている場合、 CreateTime プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

snowflake.metadata.topic

値が FALSE に設定されている場合、 topic プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

snowflake.metadata.offset.and.partition

値が FALSE に設定されている場合、 Offset および Partition プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

snowflake.metadata.all

値が FALSE に設定されている場合、 RECORD_METADATA 列のメタデータは完全に空です。デフォルト値は TRUE です。

transforms

Kafkaコネクタが検出したtombstoneレコードをスキップし、それらをターゲットテーブルにロードしないように指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。

プロパティ値を "tombstoneHandlerExample" に設定します。

注釈

このプロパティは、Kafkaコミュニティコンバーター(つまり、value.converter`プロパティ値)でのみ使用します(例: ``org.apache.kafka.connect.json.JsonConverter``または``org.apache.kafka.connect.json.AvroConverter`)。Snowflakeコンバーターでtombstoneレコードの処理を管理するには、代わりに behavior.on.null.values プロパティを使用します。

transforms.tombstoneHandlerExample.type

transforms プロパティを設定するときに必要です。

プロパティ値を "io.confluent.connect.transforms.TombstoneHandler" に設定

behavior.on.null.values

Kafkaコネクタがtombstoneレコードを処理する方法を指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。 Snowpipe では、このプロパティは Kafkaコネクタのバージョン1.5.5以降でサポートされています。Snowpipeストリーミング では、このプロパティはKafkaコネクタのバージョン2.1.0以降でサポートされています。

このプロパティは、次の値をサポートします。

DEFAULT

Kafkaコネクタがtombstoneレコードを検出すると、コンテンツ列に空の JSON 文字列を挿入します。

IGNORE

Kafkaコネクタはtombstoneレコードをスキップし、これらのレコードの行を挿入しません。

デフォルト値は DEFAULT です。

注釈

Tombstone記録のインジェスチョンは、インジェスチョンメソッドによって異なります。

  • Snowpipeでは、KafkaコネクタはSnowflakeコンバーターのみを使用します。Kafkaコミュニティコンバーターを使用してtombstoneレコードの処理を管理するには、代わりに transform プロパティと transforms.tombstoneHandlerExample.type プロパティを使用します。

  • Snowpipe Streamingでは、Kafkaコネクタはコミュニティコンバーターのみを使用します。

Kafkaブローカーに送信された記録は、Kafkaコネクタによって削除され、オフセットが欠落するため、 NULL であってはなりません。オフセットの欠落は、特定のユースケースにおいてKafkaコネクタを破壊します。NULL の記録ではなく、tombstoneの記録を使用することをお勧めします。

キーペア認証とキーローテーションの使用

Kafkaコネクタは、ユーザー名とパスワード認証の代わりに、キーペア認証に依存しています。この認証方法には、2048ビット(最小)の RSA キーペアが必要です。OpenSSLを使用して公開キーと秘密キーのペアを生成します。公開キーは、構成ファイルで定義されたSnowflakeユーザーに割り当てられます。

このページのキーペア認証タスクと キーペアのローテーション</user-guide/key-pair-auth>`のタスクを完了した後、このトピックの後半にある :ref:`label-kafkahp_externalize_secrets の推奨事項を評価します。

公開/秘密キーペアを構成するには、

  1. ターミナルウィンドウのコマンドラインから、秘密キーを生成します。

    秘密キーの暗号化バージョンまたは非暗号化バージョンを生成できます。

    注釈

    Kafkaコネクタは、連邦情報処理標準(140-2)(つまり、FIPS 140-2)要件を満たすように検証された暗号化アルゴリズムをサポートしています。詳細については、 FIPS 140-2 をご参照ください。

    非暗号化バージョンを生成するには、次のコマンドを使用します。

    $ openssl genrsa -out rsa_key.pem 2048
    

    暗号化バージョンを生成するには、次のコマンドを使用します。

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    

    <アルゴリズム> は、 FIPS 140-2準拠の暗号化アルゴリズムです。

    たとえば、暗号化アルゴリズムとして AES 256を指定するには、

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    

    秘密キーの暗号化されたバージョンを生成する場合、パスフレーズを記録します。後で、Kafka構成ファイルの snowflake.private.key.passphrase プロパティでパスフレーズを指定します。

    サンプル PEM 秘密キー

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. コマンドラインから、秘密キーを参照して公開キーを生成します。

    秘密キーが暗号化され、「 rsa_key.p8 」という名前のファイルに含まれていると仮定して、次のコマンドを使用します。

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    サンプル PEM 公開キー

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. 公開キーファイルと秘密キーファイルを保存用のローカルディレクトリにコピーします。ファイルへのパスを記録します。秘密キーは PKCS#8(公開キー暗号化標準)形式を使用して格納され、前の手順で指定したパスフレーズを使用して暗号化されることに注意してください。ただし、オペレーティングシステムが提供するファイル権限メカニズムを使用して、ファイルを不正アクセスから保護する必要があります。ファイルが使用されていない場合、ファイルを保護するのはユーザーの責任です。

  4. Snowflakeにログインします。ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当てます。

    例:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    注釈

    • ユーザーを変更できるのは、セキュリティ管理者(つまり、SECURITYADMINロールを付与されているユーザー)以上のみです。

    • SQL ステートメントで公開キーのヘッダーとフッターを除外します。

    DESCRIBE USER を使用してユーザーの公開キーの指紋を検証します。

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    注釈

    RSA_PUBLIC_KEY_2_FP プロパティは、 キーペアローテーションの構成 で説明されています。

  5. 秘密キー全体をコピーして、構成ファイルの snowflake.private.key フィールドに貼り付けます。ファイルを保存します。

シークレットの外部化

Snowflakeは、秘密キーなどの秘密を外部化し、暗号化された形式、または AWS Key Management Service(KMS)、Microsoft Azure Key Vault、 HashiCorp Vaultなどのキー管理サービスで保存することを強くお勧めします。これは、Kafka Connectクラスターで ConfigProvider 実装を使用して実現できます。

詳細については、この サービス のConfluent説明をご参照ください。

コネクタの開始

サードパーティのConfluentまたはApache Kafkaのドキュメントに記載されている手順を使用して、Kafkaを開始します。Kafkaコネクタは、分散モードまたはスタンドアロンモードで開始できます。それぞれの手順を以下に示します。

分散モード

ターミナルウィンドウで、次のコマンドを実行します。

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors

スタンドアロンモード

ターミナルウィンドウで、次のコマンドを実行します。

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties

注釈

Apache KafkaまたはConfluent Kafkaのデフォルトのインストールには、すでにファイル connect-standalone.properties が含まれているはずです)

次のステップ

コネクタをテストします