Snowflake High Performance connector for Kafka:Kafkaを構成する

このトピックでは、 Snowflake High Performance connector for Kafka のためにKafkaをインストールして構成する手順について説明します。

Kafkaコネクタのインストール

Kafkaコネクタは JAR (Java実行可能ファイル)として提供されます。

Snowflakeは、次の2つのバージョンのコネクタを提供します。

コネクタのバージョンはどちらもSnowflake Private Previewで入手でき、Snowflakeから取得する必要があります。Snowflakeアカウントチームに問い合わせて、コネクタ JAR ファイルを取得してください。

使用すべきバージョンがわからない場合は、 コネクタのバージョンの選択 をご参照ください。Kafkaコネクタの構成 ==============================================================================

コネクタの構成はベンダー固有です。Amazon MSK Connectなどの一部の実装には、コネクタを構成するための UI があり、 JSON およびプロパティファイル形式の構成を受け入れます。

このセクションには、コネクタのパラメーター名と値の一般的な参照情報が含まれています。クラウドベンダーごとに、構成要件がやや異なる場合があることに注意してください。

重要

Kafka Connectフレームワークは、Kafkaコネクタの構成設定をマスターノードからワーカーノードにブロードキャストします。構成設定には、機密情報(具体的にはSnowflakeユーザー名と秘密キー)が含まれます。Kafka Connectノード間の通信チャネルを必ず保護してください。手順については、Apache Kafkaソフトウェアのドキュメントをご参照ください。

各構成ファイルは、1つのデータベースとそのデータベース内の1つのスキーマのトピックと対応するテーブルを指定します。1つのコネクタは任意の数のトピックからメッセージを取り込むことができますが、対応するテーブルはすべて単一のデータベースとスキーマにある必要があります。

構成フィールドの説明については、 コネクタ構成プロパティ をご参照ください。

重要

通常、構成ファイルには秘密キーなどのセキュリティ関連情報が含まれているため、ファイルに読み取り/書き込み権限を適切に設定してアクセスを制限してください。

さらに、構成ファイルを安全な外部の場所またはキー管理サービスに保存することを検討してください。

構成JSONファイルの例

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "tasks.max": "1",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.warehouse.name": "WH",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "all"
      }
}
Copy

構成プロパティファイルの例

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
Copy

コネクタ構成プロパティ

必須のプロパティ

name

アプリケーション名。これは、顧客が使用するすべてのKafkaコネクタで一意でなければなりません。この名前は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効な識別子については、 識別子の要件 をご参照ください。

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

トピックのコンマ区切りリスト。デフォルトでは、Snowflakeはテーブル名がトピック名と同じであると想定します。テーブル名がトピック名と同じでない場合は、オプションの topic2table.map パラメーター(下記)を使用して、トピック名からテーブル名へのマッピングを指定します。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。

注釈

両方ではなく、 topics または topics.regexいずれか が必要です。

topics.regex

これは、Snowflakeテーブルに読み込むメッセージを含むトピックを指定する正規表現(「regex」)です。コネクタは、正規表現に一致するトピック名からデータをロードします。正規表現は、Java正規表現のルールに従う必要があります(つまり、java.util.regex.Patternと互換性があります)。構成ファイルには、両方ではなく、 topics または topics.regexいずれか を含める必要があります。

snowflake.url.name

Snowflakeアカウントにアクセスするための URL。この URL には、使用する アカウント識別子 が含まれている必要があります。プロトコル(https://)とポート番号はオプションです。

snowflake.user.name

Snowflakeアカウントのユーザーログイン名。

snowflake.role.name

コネクタがテーブルにデータを挿入するために使用するロールの名前。

snowflake.private.key

ユーザーを認証するための秘密キー。ヘッダーまたはフッターではなく、キーのみを含めます。キーが複数の行に分割されている場合、改行を削除します。暗号化されていないキーを提供するか、暗号化されたキーを提供して snowflake.private.key.passphrase パラメーターを提供し、Snowflakeがキーを復号化できるようにします。:emph:` パラメーター値が暗号化されている 時に限り``snowflake.private.key` 、このパラメーターを使用します。これにより、 キーペア認証とキーペアローテーション の指示に従って暗号化された秘密キーが復号化されます。

注釈

オプションのプロパティsnowflake.private.key.passphrase もご参照ください。

snowflake.database.name

行を挿入するテーブルを含むデータベースの名前。

snowflake.schema.name

行を挿入するテーブルを含むスキーマの名前。

header.converter

レコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要です。値は "org.apache.kafka.connect.storage.StringConverter" です。

key.converter

これは、Kafkaの記録のキーコンバーターです(例: "org.apache.kafka.connect.storage.StringConverter")。これはKafkaコネクタでは使用されませんが、Kafka Connectプラットフォームでは必要です。

現在の制限については Kafkaコネクタの制限 をご参照ください。

value.converter

コネクタは、標準のKafkaコミュニティコンバーターをサポートしています。データ形式に基づいて適切なコンバーターを選択します。

  • JSON 記録の場合: "org.apache.kafka.connect.json.JsonConverter"

  • スキーマレジストリを使用するAvro記録の場合: "io.confluent.connect.avro.AvroConverter"

現在の制限については Kafkaコネクタの制限 をご参照ください。

オプションのプロパティ

snowflake.private.key.passphrase

このパラメーターの値が空でない場合、コネクタはこのフレーズを使用して秘密キーの復号化を試みます。

tasks.max

タスクの数。通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じです。最適なパフォーマンスを実現するために、タスク数をKafkaパーティションの総数と等しく、CPU コアの数を超えないように設定することをお勧めします。タスク数が多いとメモリ消費量が増え、頻繁にリバランスが発生する可能性があります。

snowflake.topic2table.map

このオプションのパラメータにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。トピック設定では、 topics.regex の使用と同様に、正規表現を使ってトピックを定義することができます。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。

例:

topics="topic1,topic2,topic5,topic6"
snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
Copy

次のように書くことができます:

topics.regex="topic[0-9]"
snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
Copy
value.converter.schema.registry.url

形式がAvroで、スキーマレジストリサービスを使用している場合、これはスキーマレジストリサービスの URL である必要があります。それ以外の場合、このフィールドは空でなければなりません。

value.converter.break.on.schema.registry.error

Schema Registry ServiceからAvroデータをロードする場合、このプロパティは、スキーマIDのフェッチ中にエラーが発生した場合にKafkaコネクタがレコードの消費を停止するかどうかを決定します。デフォルト値は false です。この動作を有効にするには、値を true に設定します。

jvm.proxy.host

Snowflake Kafka Connectorがプロキシサーバー経由でSnowflakeにアクセスできるようにするには、このパラメーターを設定して、そのプロキシサーバーのホストを指定します。

jvm.proxy.port

Snowflake Kafka Connectorがプロキシサーバーを介してSnowflakeにアクセスできるようにするには、このパラメーターを設定してそのプロキシサーバーのポートを指定します。

snowflake.streaming.max.client.lag

Snowflake Ingest Java がSnowflakeにデータをフラッシュする頻度を秒単位で指定します。

:
  • 最小: 1

  • 最大: 600

デフォルト:

1

jvm.proxy.username

プロキシサーバーで認証するユーザー名。

jvm.proxy.password

プロキシサーバーで認証するユーザー名のパスワード。

snowflake.jdbc.map

例: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

その他の JDBC プロパティ(JDBC ドライバーの接続パラメーター参照 参照)は検証されません。これらの追加プロパティは検証されず、以下のような必須プロパティをオーバーライドしたり、その代わりに使ったりしてはなりません: jvm.proxy.xxxsnowflake.user.namesnowflake.private.keysnowflake.schema.name など。

以下の組み合わせのいずれかを指定します:
  • tracing プロパティと JDBC_TRACE 環境変数

  • database プロパティと snowflake.database.name

曖昧な動作となり、動作は JDBC ドライバーによって決定されます。

value.converter.basic.auth.credentials.source

Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合、このパラメーターを文字列「USER_INFO」に設定し、以下で説明する value.converter.basic.auth.user.info パラメーターを設定します。それ以外の場合は、このパラメーターを省略します。

value.converter.basic.auth.user.info

Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合は、上記のとおりこのパラメーターを文字列「<user_ID>:<password>」に設定し、value.converter.basic.auth.credentials.sourceパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。

snowflake.metadata.createtime

値が FALSE に設定されている場合、 CreateTime プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

snowflake.metadata.topic

値が FALSE に設定されている場合、 topic プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

snowflake.metadata.offset.and.partition

値が FALSE に設定されている場合、 Offset および Partition プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

snowflake.metadata.all

値が FALSE に設定されている場合、 RECORD_METADATA 列のメタデータは完全に空です。デフォルト値は TRUE です。

transforms

Kafkaコネクタが検出したtombstoneレコードをスキップし、それらをターゲットテーブルにロードしないように指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。

プロパティ値を "tombstoneHandlerExample" に設定します。

注釈

このプロパティは、Kafkaコミュニティコンバーター(つまり、 value.converter プロパティ値)でのみ使用します(例: org.apache.kafka.connect.json.JsonConverter または org.apache.kafka.connect.json.AvroConverter)。Snowflakeコンバーターでtombstoneレコードの処理を管理するには、代わりに behavior.on.null.values プロパティを使用します。

transforms.tombstoneHandlerExample.type

transforms プロパティを設定するときに必要です。

プロパティ値を "io.confluent.connect.transforms.TombstoneHandler" に設定

behavior.on.null.values

Kafkaコネクタがtombstoneレコードを処理する方法を指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。 Snowpipe では、このプロパティは Kafkaコネクタのバージョン1.5.5以降でサポートされています。Snowpipeストリーミング では、このプロパティはKafkaコネクタのバージョン2.1.0以降でサポートされています。

このプロパティは、次の値をサポートします。

DEFAULT

Kafkaコネクタがtombstoneレコードを検出すると、コンテンツ列に空の JSON 文字列を挿入します。

IGNORE

Kafkaコネクタはtombstoneレコードをスキップし、これらのレコードの行を挿入しません。

デフォルト値は DEFAULT です。

注釈

Tombstone記録のインジェスチョンは、インジェスチョンメソッドによって異なります。

  • Snowpipeでは、KafkaコネクタはSnowflakeコンバーターのみを使用します。Kafkaコミュニティコンバーターを使用してtombstoneレコードの処理を管理するには、代わりに transform プロパティと transforms.tombstoneHandlerExample.type プロパティを使用します。

  • Snowpipe Streamingでは、Kafkaコネクタはコミュニティコンバーターのみを使用します。

Kafkaブローカーに送信された記録は、Kafkaコネクタによって削除され、オフセットが欠落するため、 NULL であってはなりません。オフセットの欠落は、特定のユースケースにおいてKafkaコネクタを破壊します。NULL の記録ではなく、tombstoneの記録を使用することをお勧めします。

次のステップ

コネクタをテストします