Snowflake High Performance connector for Kafka:Kafkaを構成する¶
このトピックでは、 Snowflake High Performance connector for Kafka のためにKafkaをインストールして構成する手順について説明します。
Kafkaコネクタのインストール¶
Kafkaコネクタは JAR (Java実行可能ファイル)として提供されます。
Snowflakeは、次の2つのバージョンのコネクタを提供します。
Kafka ConnectのConfluent実装用バージョン。
open source software (OSS) Apache Kafka package https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/ のバージョン。
コネクタのバージョンはどちらもSnowflake Private Previewで入手でき、Snowflakeから取得する必要があります。Snowflakeアカウントチームに問い合わせて、コネクタ JAR ファイルを取得してください。
使用すべきバージョンがわからない場合は、 コネクタのバージョンの選択 をご参照ください。Kafkaコネクタの構成 ==============================================================================
コネクタの構成はベンダー固有です。Amazon MSK Connectなどの一部の実装には、コネクタを構成するための UI があり、 JSON およびプロパティファイル形式の構成を受け入れます。
このセクションには、コネクタのパラメーター名と値の一般的な参照情報が含まれています。クラウドベンダーごとに、構成要件がやや異なる場合があることに注意してください。
重要
Kafka Connectフレームワークは、Kafkaコネクタの構成設定をマスターノードからワーカーノードにブロードキャストします。構成設定には、機密情報(具体的にはSnowflakeユーザー名と秘密キー)が含まれます。Kafka Connectノード間の通信チャネルを必ず保護してください。手順については、Apache Kafkaソフトウェアのドキュメントをご参照ください。
各構成ファイルは、1つのデータベースとそのデータベース内の1つのスキーマのトピックと対応するテーブルを指定します。1つのコネクタは任意の数のトピックからメッセージを取り込むことができますが、対応するテーブルはすべて単一のデータベースとスキーマにある必要があります。
構成フィールドの説明については、 コネクタ構成プロパティ をご参照ください。
重要
通常、構成ファイルには秘密キーなどのセキュリティ関連情報が含まれているため、ファイルに読み取り/書き込み権限を適切に設定してアクセスを制限してください。
さらに、構成ファイルを安全な外部の場所またはキー管理サービスに保存することを検討してください。
構成JSONファイルの例
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all"
}
}
構成プロパティファイルの例
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
コネクタ構成プロパティ¶
必須のプロパティ¶
nameアプリケーション名。これは、顧客が使用するすべてのKafkaコネクタで一意でなければなりません。この名前は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効な識別子については、 識別子の要件 をご参照ください。
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopicsトピックのコンマ区切りリスト。デフォルトでは、Snowflakeはテーブル名がトピック名と同じであると想定します。テーブル名がトピック名と同じでない場合は、オプションの
topic2table.mapパラメーター(下記)を使用して、トピック名からテーブル名へのマッピングを指定します。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。注釈
両方ではなく、
topicsまたはtopics.regexの いずれか が必要です。topics.regexこれは、Snowflakeテーブルに読み込むメッセージを含むトピックを指定する正規表現(「regex」)です。コネクタは、正規表現に一致するトピック名からデータをロードします。正規表現は、Java正規表現のルールに従う必要があります(つまり、java.util.regex.Patternと互換性があります)。構成ファイルには、両方ではなく、
topicsまたはtopics.regexの いずれか を含める必要があります。snowflake.url.nameSnowflakeアカウントにアクセスするための URL。この URL には、使用する アカウント識別子 が含まれている必要があります。プロトコル(
https://)とポート番号はオプションです。snowflake.user.nameSnowflakeアカウントのユーザーログイン名。
snowflake.role.nameコネクタがテーブルにデータを挿入するために使用するロールの名前。
snowflake.private.keyユーザーを認証するための秘密キー。ヘッダーまたはフッターではなく、キーのみを含めます。キーが複数の行に分割されている場合、改行を削除します。暗号化されていないキーを提供するか、暗号化されたキーを提供して
snowflake.private.key.passphraseパラメーターを提供し、Snowflakeがキーを復号化できるようにします。:emph:` パラメーター値が暗号化されている時に限り``snowflake.private.key`、このパラメーターを使用します。これにより、 キーペア認証とキーペアローテーション の指示に従って暗号化された秘密キーが復号化されます。注釈
オプションのプロパティ の
snowflake.private.key.passphraseもご参照ください。snowflake.database.name行を挿入するテーブルを含むデータベースの名前。
snowflake.schema.name行を挿入するテーブルを含むスキーマの名前。
header.converterレコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要です。値は
"org.apache.kafka.connect.storage.StringConverter"です。key.converterこれは、Kafkaの記録のキーコンバーターです(例:
"org.apache.kafka.connect.storage.StringConverter")。これはKafkaコネクタでは使用されませんが、Kafka Connectプラットフォームでは必要です。現在の制限については Kafkaコネクタの制限 をご参照ください。
value.converterコネクタは、標準のKafkaコミュニティコンバーターをサポートしています。データ形式に基づいて適切なコンバーターを選択します。
JSON 記録の場合:
"org.apache.kafka.connect.json.JsonConverter"スキーマレジストリを使用するAvro記録の場合:
"io.confluent.connect.avro.AvroConverter"
現在の制限については Kafkaコネクタの制限 をご参照ください。
オプションのプロパティ¶
snowflake.private.key.passphraseこのパラメーターの値が空でない場合、コネクタはこのフレーズを使用して秘密キーの復号化を試みます。
tasks.maxタスクの数。通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じです。最適なパフォーマンスを実現するために、タスク数をKafkaパーティションの総数と等しく、CPU コアの数を超えないように設定することをお勧めします。タスク数が多いとメモリ消費量が増え、頻繁にリバランスが発生する可能性があります。
snowflake.topic2table.mapこのオプションのパラメータにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。トピック設定では、
topics.regexの使用と同様に、正規表現を使ってトピックを定義することができます。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。例:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
次のように書くことができます:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.url形式がAvroで、スキーマレジストリサービスを使用している場合、これはスキーマレジストリサービスの URL である必要があります。それ以外の場合、このフィールドは空でなければなりません。
value.converter.break.on.schema.registry.errorSchema Registry ServiceからAvroデータをロードする場合、このプロパティは、スキーマIDのフェッチ中にエラーが発生した場合にKafkaコネクタがレコードの消費を停止するかどうかを決定します。デフォルト値は
falseです。この動作を有効にするには、値をtrueに設定します。jvm.proxy.hostSnowflake Kafka Connectorがプロキシサーバー経由でSnowflakeにアクセスできるようにするには、このパラメーターを設定して、そのプロキシサーバーのホストを指定します。
jvm.proxy.portSnowflake Kafka Connectorがプロキシサーバーを介してSnowflakeにアクセスできるようにするには、このパラメーターを設定してそのプロキシサーバーのポートを指定します。
snowflake.streaming.max.client.lagSnowflake Ingest Java がSnowflakeにデータをフラッシュする頻度を秒単位で指定します。
- 値:
最小:
1秒最大:
600秒
- デフォルト:
1秒
jvm.proxy.usernameプロキシサーバーで認証するユーザー名。
jvm.proxy.passwordプロキシサーバーで認証するユーザー名のパスワード。
snowflake.jdbc.map例:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"その他の JDBC プロパティ(JDBC ドライバーの接続パラメーター参照 参照)は検証されません。これらの追加プロパティは検証されず、以下のような必須プロパティをオーバーライドしたり、その代わりに使ったりしてはなりません:
jvm.proxy.xxx、snowflake.user.name、snowflake.private.key、snowflake.schema.nameなど。- 以下の組み合わせのいずれかを指定します:
tracingプロパティとJDBC_TRACE環境変数databaseプロパティとsnowflake.database.name
曖昧な動作となり、動作は JDBC ドライバーによって決定されます。
value.converter.basic.auth.credentials.sourceAvroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合、このパラメーターを文字列「USER_INFO」に設定し、以下で説明する
value.converter.basic.auth.user.infoパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。value.converter.basic.auth.user.infoAvroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合は、上記のとおりこのパラメーターを文字列「<user_ID>:<password>」に設定し、value.converter.basic.auth.credentials.sourceパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。
snowflake.metadata.createtime値が FALSE に設定されている場合、
CreateTimeプロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。snowflake.metadata.topic値が FALSE に設定されている場合、
topicプロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。snowflake.metadata.offset.and.partition値が FALSE に設定されている場合、
OffsetおよびPartitionプロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。snowflake.metadata.all値が FALSE に設定されている場合、 RECORD_METADATA 列のメタデータは完全に空です。デフォルト値は TRUE です。
transformsKafkaコネクタが検出したtombstoneレコードをスキップし、それらをターゲットテーブルにロードしないように指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。
プロパティ値を
"tombstoneHandlerExample"に設定します。注釈
このプロパティは、Kafkaコミュニティコンバーター(つまり、
value.converterプロパティ値)でのみ使用します(例:org.apache.kafka.connect.json.JsonConverterまたはorg.apache.kafka.connect.json.AvroConverter)。Snowflakeコンバーターでtombstoneレコードの処理を管理するには、代わりにbehavior.on.null.valuesプロパティを使用します。transforms.tombstoneHandlerExample.typetransformsプロパティを設定するときに必要です。プロパティ値を
"io.confluent.connect.transforms.TombstoneHandler"に設定behavior.on.null.valuesKafkaコネクタがtombstoneレコードを処理する方法を指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。 Snowpipe では、このプロパティは Kafkaコネクタのバージョン1.5.5以降でサポートされています。Snowpipeストリーミング では、このプロパティはKafkaコネクタのバージョン2.1.0以降でサポートされています。
このプロパティは、次の値をサポートします。
DEFAULTKafkaコネクタがtombstoneレコードを検出すると、コンテンツ列に空の JSON 文字列を挿入します。
IGNOREKafkaコネクタはtombstoneレコードをスキップし、これらのレコードの行を挿入しません。
デフォルト値は
DEFAULTです。注釈
Tombstone記録のインジェスチョンは、インジェスチョンメソッドによって異なります。
Snowpipeでは、KafkaコネクタはSnowflakeコンバーターのみを使用します。Kafkaコミュニティコンバーターを使用してtombstoneレコードの処理を管理するには、代わりに
transformプロパティとtransforms.tombstoneHandlerExample.typeプロパティを使用します。Snowpipe Streamingでは、Kafkaコネクタはコミュニティコンバーターのみを使用します。
Kafkaブローカーに送信された記録は、Kafkaコネクタによって削除され、オフセットが欠落するため、 NULL であってはなりません。オフセットの欠落は、特定のユースケースにおいてKafkaコネクタを破壊します。NULL の記録ではなく、tombstoneの記録を使用することをお勧めします。