Kafkaコネクタのインストールと構成¶
Kafkaコネクタは JAR (Java実行可能ファイル)として提供されます。
Snowflakeは、次の2つのバージョンのコネクタを提供します。
KafkaのConfluentパッケージバージョン のバージョン。
このトピックの手順では、どちらかのバージョンのコネクタにのみ適用される手順を指定します。
このトピックの内容:
Snowflakeオブジェクトのアクセス制御の構成¶
必要な権限¶
Kafkaコネクタで使用されるSnowflakeオブジェクトを作成および管理するには、次の最小限の権限を持つロールが必要です。
オブジェクト |
権限 |
注意 |
|---|---|---|
データベース |
USAGE |
|
スキーマ |
USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE |
スキーマレベルのオブジェクトが作成された後に、 CREATE |
テーブル |
OWNERSHIP |
Kafkaコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。. コネクタがKafkaトピックのレコードの新しいターゲットテーブルを作成する場合、Kafka構成ファイルで指定されたユーザーの既定のロールがテーブル所有者になります(つまり、テーブルに対する OWNERSHIP 権限があります)。 |
ステージ |
READ . WRITE |
Kafkaコネクタを使用してデータファイルをKafkaから 既存の 内部ステージにステージングする場合にのみ必要です(推奨されません)。. コネクタがKafkaトピックから消費されたデータファイルを一時的に保存する新しいステージを作成する場合、Kafka構成ファイルで指定されたユーザーの既定のロールがステージ所有者になります(つまり、ステージに対する OWNERSHIP 権限があります)。 |
各Kafkaインスタンスに個別のユーザー( CREATE USER を使用)とロール( CREATE ROLE を使用)を作成して、必要に応じてアクセス権限を個別に取り消すことをSnowflakeはお勧めします。ロールは、ユーザーの既定のロールとして割り当てる必要があります。
Kafkaコネクタを使用するロールの作成¶
次のスクリプトは、Kafkaコネクタで使用するカスタムロールを作成します(例:KAFKA_CONNECTOR_ROLE_1)。権限を付与できるロール(例: SECURITYADMIN または MANAGE GRANTS 権限を持つロール)は、このカスタムロールを任意のユーザーに付与して、Kafkaコネクタが必要なSnowflakeオブジェクトを作成し、テーブルにデータを挿入できるようにします。スクリプトは、特定の 既存の データベースとスキーマ(kafka_db.kafka_schema)およびユーザー(kafka_connector_user_1)を参照します。
-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;
-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;
-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;
-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
どの権限も、コネクタで使用されるロールに直接付与すべきことに注意してください。付与はロール階層から継承することはできません。
カスタムロールとロール階層の作成の詳細については、 アクセス制御の構成 をご参照ください。
インストールの前提条件¶
Kafkaコネクタは、次のパッケージバージョンをサポートしています。
パッケージ
Snowflake Kafkaコネクタバージョン
パッケージのサポート(Snowflakeによりテスト済み)
Apache Kafka
2.0.0(またはそれ以上)
Apache Kafka 2.8.2、3.7.2
Confluent
2.0.0(またはそれ以上)
Confluent 6.2.15、7.8.2
Kafkaコネクタは、Kafka Connect API 3.9.0で使用するために構築されています。これよりも新しいバージョンのKafka Connect API はテストされていません。3.9.0よりも古いバージョンは、コネクタとの互換性があります。詳細については、 Kafkaの互換性 をご参照ください。
Kafkaコネクタと JDBC ドライバーのjarファイルの両方が環境にある場合、 JDBC バージョンが、目的のKafkaコネクタバージョンの
pom.xmlファイルで指定されたsnowflake-jdbcバージョンと一致していることを確認します。お好みのKafkaコネクタのリリースバージョン、例えば v2.0.1 に移動できます。次にpom.xmlファイルをブラウズしてsnowflake-jdbcのバージョンを調べます。データのインジェストにAvro形式を使用する場合:
https://mvnrepository.com/artifact/org.apache.avroから入手可能なAvroパーサー、バージョン1.8.2(またはそれ以上)を使用します。
Avroでスキーマレジストリ機能を使用する場合は、https://mvnrepository.com/artifact/io.confluentで入手可能なバージョン5.0.0(またはそれ以上)のKafka Connect Avro Converterを使用してください。
スキーマレジストリ機能は OSS Apache Kafkaパッケージでは使用できないことに注意してください。
希望のデータ保持時間やストレージ制限でKafkaを構成します。
Kafka Connectクラスターをインストールして構成します。
各Kafka Connectクラスタノードには、Kafkaコネクタに十分な RAM を含める必要があります。推奨される最小量は、Kafkaパーティションごとに5 MB です。これは、Kafka Connectが行っている他の作業に必要な RAM に追加する分です。
Kafka BrokerとKafka Connect Runtimeで同じバージョンを使用することをお勧めします。
Snowflakeアカウントと同じクラウドプロバイダー 地域 でKafka Connectインスタンスを実行することを強くお勧めします。これは厳密な要件ではありませんが、通常はスループットが向上します。
Snowflakeクライアントでサポートされているオペレーティングシステムのリストについては、 オペレーティングシステムのサポート をご参照ください。
コネクタのインストール¶
このセクションでは、Confluent用のKafkaコネクタのインストールと構成の手順を説明します。Kafkaコネクタのバージョンについては、次のテーブルをご参照ください。
リリースシリーズ |
ステータス |
注意 |
|---|---|---|
4.x.x |
Private Preview |
Early access. Currently the migration from 3.x and 2.x versions is not supported. |
3.x.x |
正式にサポート |
最新バージョンで、強くお勧めします。 |
2.x.x |
正式にサポート |
アップグレードをお勧めします。 |
1.x.x |
サポート対象外 |
このリリースシリーズは使用しないでください。 |
Confluent用のコネクタのインストール¶
Kafkaコネクタファイルのダウンロード¶
次のいずれかの場所からKafkaコネクタ JAR ファイルをダウンロードします。
- Confluentハブ:
-
このパッケージには、キーペア認証に暗号化または非暗号化秘密キーを使用するために必要なすべての依存関係が含まれています。詳細については、このトピックの キーペア認証およびキーローテーションの使用 (このトピック内)をご参照ください。
- Maven Central Repository:
https://mvnrepository.com/artifact/com.snowflake
JAR ファイルは、キーペア認証に 暗号化されていない 秘密キーを使用するための追加の依存関係を必要としません。暗号化された秘密キーを使用するには、 Bouncy Castle 暗号化ライブラリ( JAR ファイル)をダウンロードします。SnowflakeはBouncy Castleを使用して、ログインに使用される暗号化された RSA 秘密キーを復号化します。
Kafka Connectorのバージョンが3.1.1より前の場合は、代わりに以下のBouncy Castleバージョンを使用してください。
これらのファイルをKafkaコネクタ JAR ファイルと同じローカルフォルダーにダウンロードします。
コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。
Kafkaコネクタのインストール¶
他のコネクタをインストールするために提供されている手順を使用して、Kafkaコネクタをインストールします。
オープンソースApache Kafka用のコネクタのインストール¶
このセクションでは、オープンソースのApache Kafka用にKafkaコネクタをインストールおよび構成する手順について説明します。
Apache Kafkaのインストール¶
Kafkaパッケージを公式ウェブサイトhttps://kafka.apache.org/downloadsからダウンロードします。
ターミナルウィンドウで、パッケージファイルをダウンロードしたディレクトリに移動します。
次のコマンドを実行して、
kafka_<scalaバージョン>-<kafkaバージョン>.tgzファイルを解凍します。tar xzvf kafka_<scala_version>-<kafka_version>.tgz
JDK のインストール¶
Java Development Kit(JDK)をインストールして構成します。Snowflakeは、 JDKのStandard Edition(SE)でテストします。Enterprise Edition(EE)は互換性があると予想されますが、テストされていません。
この手順を既に完了している場合は、このセクションをスキップできます。
https://www.oracle.com/technetwork/java/javase/downloads/index.htmlから JDK をダウンロードします。
JDKをインストールまたは解凍します。
ご使用のオペレーティングシステムの手順に従って、環境変数 JAVA_HOME が JDKを含むディレクトリを指すように設定します。
Kafka Connector JAR ファイルをダウンロードします¶
Maven Central RepositoryからKafkaコネクタ JAR ファイルをダウンロードします。
JAR ファイルは、キーペア認証に 暗号化されていない 秘密キーを使用するための追加の依存関係を必要としません。暗号化された秘密キーを使用するには、 Bouncy Castle 暗号化ライブラリ( JAR ファイル)をダウンロードします。SnowflakeはBouncy Castleを使用して、ログインに使用される暗号化された RSA 秘密キーを復号化します。
Kafkaデータが Apache Avro 形式でストリーミングされている場合、Avro JAR ファイルをダウンロードします。
コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。
Kafkaコネクタのインストール¶
Kafkaコネクタ JAR ファイルのダウンロード でダウンロードした JAR ファイルを <kafkaディレクトリ>/libs フォルダーにコピーします。
Kafkaコネクタの構成¶
コネクタは、Snowflakeログイン認証情報、トピック名、Snowflakeテーブル名などのパラメーターを指定するファイルを作成することで構成されます。
重要
Kafka Connectフレームワークは、Kafkaコネクタの構成設定をマスターノードからワーカーノードにブロードキャストします。構成設定には、機密情報(具体的にはSnowflakeユーザー名と秘密キー)が含まれます。Kafka Connectノード間の通信チャネルを必ず保護してください。手順については、Apache Kafkaソフトウェアのドキュメントをご参照ください。
各構成ファイルは、1つのデータベースとそのデータベース内の1つのスキーマのトピックと対応するテーブルを指定します。1つのコネクタは任意の数のトピックからメッセージを取り込むことができますが、対応するテーブルはすべて単一のデータベースとスキーマに保存される必要があります。
このセクションでは、分散モードとスタンドアロンモードの両方について説明します。
構成フィールドの説明については、 Kafka構成プロパティ をご参照ください。
重要
通常、構成ファイルには秘密キーなどのセキュリティ関連情報が含まれているため、ファイルに読み取り/書き込み権限を適切に設定してアクセスを制限してください。
さらに、構成ファイルを安全な外部の場所またはキー管理サービスに保存することを検討してください。詳細については、このトピックの 秘密の外部化 をご参照ください。
分散モード¶
Kafka構成ファイルを作成します(例: <パス>/<Configファイル>.json)。すべてのコネクタ構成情報をファイルに入力します。ファイルは JSON 形式である必要があります。
サンプル構成ファイル
{
"name":"XYZCompanySensorData",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"topic1,topic2",
"snowflake.topic2table.map": "topic1:table1,topic2:table2",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.user.name":"jane.smith",
"snowflake.private.key":"xyz123",
"snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
"snowflake.database.name":"mydb",
"snowflake.schema.name":"myschema",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
}
}
スタンドアロンモード¶
<kafkaディレクトリ>/config/SF_connect.properties などの構成ファイルを作成します。すべてのコネクタ構成情報をファイルに入力します。
サンプル構成ファイル
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Kafka構成プロパティ¶
次のプロパティは、分散モードまたはスタンドアロンモードのいずれかのKafka構成ファイルで設定できます。
必須のプロパティ¶
nameアプリケーション名。これは、顧客が使用するすべてのKafkaコネクタで一意でなければなりません。この名前は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効な識別子については、 識別子の要件 をご参照ください。
connector.classcom.snowflake.kafka.connector.SnowflakeSinkConnectortopicsトピックのコンマ区切りリスト。デフォルトでは、Snowflakeはテーブル名がトピック名と同じであると想定します。テーブル名がトピック名と同じでない場合は、オプションの
topic2table.mapパラメーター(下記)を使用して、トピック名からテーブル名へのマッピングを指定します。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。注釈
両方ではなく、
topicsまたはtopics.regexの いずれか が必要です。topics.regexこれは、Snowflakeテーブルに読み込むメッセージを含むトピックを指定する正規表現(「regex」)です。コネクタは、正規表現に一致するトピック名からデータをロードします。正規表現は、Java正規表現のルールに従う必要があります(つまり、java.util.regex.Patternと互換性があります)。構成ファイルには、両方ではなく、
topicsまたはtopics.regexの いずれか を含める必要があります。snowflake.url.nameSnowflakeアカウントにアクセスするための URL。この URL には、使用する アカウント識別子 が含まれている必要があります。プロトコル(
https://)とポート番号はオプションです。snowflake.user.nameSnowflakeアカウントのユーザーログイン名。
snowflake.private.keyユーザーを認証するための秘密キー。ヘッダーまたはフッターではなく、キーのみを含めます。キーが複数の行に分割されている場合、改行を削除します。暗号化されていないキーを提供するか、暗号化されたキーを提供して
snowflake.private.key.passphraseパラメーターを提供し、Snowflakeがキーを復号化できるようにします。snowflake.private.keyパラメーター値が暗号化されている 時に限り 、このパラメーターを使用します。これにより、 キーペア認証およびキーローテーションの使用 (このトピック内)の指示に従って暗号化された秘密キーが復号化されます。注釈
このトピックの オプションのプロパティ の
snowflake.private.key.passphraseもご参照ください。snowflake.database.name行を挿入するテーブルを含むデータベースの名前。
snowflake.schema.name行を挿入するテーブルを含むスキーマの名前。
header.converterレコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要です。値は
"org.apache.kafka.connect.storage.StringConverter"です。key.converterこれは、Kafkaの記録のキーコンバーターです(例:
"org.apache.kafka.connect.storage.StringConverter")。これはKafkaコネクタでは使用されませんが、Kafka Connectプラットフォームでは必要です。現在の制限については Kafkaコネクタの制限 をご参照ください。
value.converterレコードが JSONでフォーマットされている場合、これは
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"になります。注釈
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"は、記録をそのまま逆シリアル化します。すべてのJSONフィールドは記録フィールドとみなされ、スキーマやメタデータを含む他のフィールドに特別な扱いは行われません。レコードがAvroでフォーマットされ、Kafkaのスキーマレジストリサービスを使用する場合、これは
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter"になります。記録がAvroでフォーマットされ、スキーマを含む(したがって、Kafkaのスキーマレジストリサービスを必要としない)場合、これは
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"になります。記録がプレーンテキストでフォーマットされている場合、これは
"org.apache.kafka.connect.storage.StringConverter"になります。現在の制限については Kafkaコネクタの制限 をご参照ください。
オプションのプロパティ¶
snowflake.private.key.passphraseこのパラメーターの値が空でない場合、Kafkaはこのフレーズを使用して秘密キーの復号化を試みます。
tasks.maxタスクの数。通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じです。最適なパフォーマンスを実現するために、タスク数をKafkaパーティションの総数と等しく、CPU コアの数を超えないように設定することをお勧めします。タスク数が多いとメモリ消費量が増え、頻繁にリバランスが発生する可能性があります。
snowflake.topic2table.mapこのオプションのパラメータにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。トピック設定では、
topics.regexの使用と同様に、正規表現を使ってトピックを定義することができます。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。重要
snowflake.topic2table.mapパラメーターが構成されている場合、Snowflake はコネクタをバージョン 3.1.0 にアップグレードすることを強く推奨します。Snowflake Connector for Kafkaリリースの詳細情報については、 Snowflake Connector for Kafka リリースノート を参照してください。例:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
次のように書くことができます:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
buffer.count.recordsSnowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。デフォルト値は
10000レコードです。buffer.flush.timeバッファーフラッシュ間の秒数。フラッシュはKafkaのメモリキャッシュから内部ステージまでです。デフォルト値は
120秒です。buffer.size.bytesデータファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位)。このデフォルト値は
5000000(5 MB)です。レコードは、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも大きくなる可能性があります。
value.converter.schema.registry.url形式がAvroで、スキーマレジストリサービスを使用している場合、これはスキーマレジストリサービスの URL である必要があります。それ以外の場合、このフィールドは空でなければなりません。
value.converter.break.on.schema.registry.errorSchema Registry ServiceからAvroデータをロードする場合、このプロパティは、スキーマIDのフェッチ中にエラーが発生した場合にKafkaコネクタがレコードの消費を停止するかどうかを決定します。デフォルト値は
falseです。この動作を有効にするには、値をtrueに設定します。Kafkaコネクタバージョン1.4.2以降でサポートされています。
jvm.proxy.hostSnowflake Kafka Connectorがプロキシサーバー経由でSnowflakeにアクセスできるようにするには、このパラメーターを設定して、そのプロキシサーバーのホストを指定します。
jvm.proxy.portSnowflake Kafka Connectorがプロキシサーバーを介してSnowflakeにアクセスできるようにするには、このパラメーターを設定してそのプロキシサーバーのポートを指定します。
jvm.proxy.usernameプロキシサーバーで認証するユーザー名。
Kafkaコネクタバージョン1.4.4以降でサポートされています。
jvm.proxy.passwordプロキシサーバーで認証するユーザー名のパスワード。
Kafkaコネクタバージョン1.4.4以降でサポートされています。
snowflake.jdbc.map例:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"その他の JDBC プロパティ(JDBC ドライバーの接続パラメーター参照 参照)は検証されません。これらの追加プロパティは検証されず、以下のような必須プロパティをオーバーライドしたり、その代わりに使ったりしてはなりません:
jvm.proxy.xxx、snowflake.user.name、snowflake.private.key、snowflake.schema.nameなど。- 以下の組み合わせのいずれかを指定します:
tracingプロパティとJDBC_TRACE環境変数databaseプロパティとsnowflake.database.name
曖昧な動作となり、動作は JDBC ドライバーによって決定されます。
value.converter.basic.auth.credentials.sourceAvroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合、このパラメーターを文字列「USER_INFO」に設定し、以下で説明する
value.converter.basic.auth.user.infoパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。value.converter.basic.auth.user.infoAvroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合は、上記のとおりこのパラメーターを文字列「<user_ID>:<password>」に設定し、value.converter.basic.auth.credentials.sourceパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。
snowflake.metadata.createtime値が FALSE に設定されている場合、
CreateTimeプロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
snowflake.metadata.topic値が FALSE に設定されている場合、
topicプロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
snowflake.metadata.offset.and.partition値が FALSE に設定されている場合、
OffsetおよびPartitionプロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
snowflake.metadata.all値が FALSE に設定されている場合、 RECORD_METADATA 列のメタデータは完全に空です。デフォルト値は TRUE です。
Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
transformsKafkaコネクタが検出したtombstoneレコードをスキップし、それらをターゲットテーブルにロードしないように指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。
プロパティ値を
"tombstoneHandlerExample"に設定します。注釈
このプロパティは、Kafkaコミュニティコンバーター(つまり、
value.converterプロパティ値)でのみ使用します(例:org.apache.kafka.connect.json.JsonConverterまたはorg.apache.kafka.connect.json.AvroConverter)。Snowflakeコンバーターでtombstoneレコードの処理を管理するには、代わりにbehavior.on.null.valuesプロパティを使用します。transforms.tombstoneHandlerExample.typetransformsプロパティを設定するときに必要です。プロパティ値を
"io.confluent.connect.transforms.TombstoneHandler"に設定behavior.on.null.valuesKafkaコネクタがtombstoneレコードを処理する方法を指定します。tombstoneレコードは、値フィールド全体がnullであるレコードとして定義されます。 Snowpipe では、このプロパティは Kafkaコネクタのバージョン1.5.5以降でサポートされています。Snowpipeストリーミング では、このプロパティはKafkaコネクタのバージョン2.1.0以降でサポートされています。
このプロパティは、次の値をサポートします。
DEFAULTKafkaコネクタがtombstoneレコードを検出すると、コンテンツ列に空の JSON 文字列を挿入します。
IGNOREKafkaコネクタはtombstoneレコードをスキップし、これらのレコードの行を挿入しません。
デフォルト値は
DEFAULTです。注釈
Tombstone記録のインジェスチョンは、インジェスチョンメソッドによって異なります。
Snowpipeでは、KafkaコネクタはSnowflakeコンバーターのみを使用します。Kafkaコミュニティコンバーターを使用してtombstoneレコードの処理を管理するには、代わりに
transformプロパティとtransforms.tombstoneHandlerExample.typeプロパティを使用します。Snowpipe Streamingでは、Kafkaコネクタはコミュニティコンバーターのみを使用します。
Kafkaブローカーに送信された記録は、Kafkaコネクタによって削除され、オフセットが欠落するため、 NULL であってはなりません。オフセットの欠落は、特定のユースケースにおいてKafkaコネクタを破壊します。NULL の記録ではなく、tombstoneの記録を使用することをお勧めします。
snowflake.snowpipe.v2CleanerEnabledSnowpipeインジェスチョンメソッド用にステージファイルクリーナーの改良バージョンを実行するかどうかを指定します。古いクリーナーにはいくつかの制限があり、一部のファイルがステージ上に残りました。
このプロパティは、Kafkaコネクタのバージョン2.2.2以降でサポートされています。
- 値:
truefalse
- デフォルト:
バージョン2.3.0以降の場合は
true、バージョン2.2.2の場合は:code:false
snowflake.snowpipe.v2CleanerIntervalSeconds新しいファイルクリーナーの実行頻度を指定します。コスト最適化の目的で、少数のメッセージが処理される場合は、パラメーター値を大幅に増やすことをお勧めします。たとえば、30分です。
このプロパティは、Kafkaコネクタのバージョン2.2.2以降でサポートされています。
- 値:
最小:
1最大: 上限なし
- デフォルト:
61秒
snowflake.streaming.channel.name.include.connector.nameWhen enabled, Snowflake Streaming channel names are prefixed with the connector name. This option enables or disables usage of channel names that were used in Kafka Connector versions 2.1.0 and 2.1.1 and are intended for users that previously used these versions and have not updated the connector.
Supported by the Kafka connector 3.4.0 (and higher).
重要
Enabling this option when updating from versions other than 2.1.0 or 2.1.1 may result in data duplication. Cannot be used together with
enable.streaming.channel.offset.migration=true- 値:
truefalse
- デフォルト:
false
enable.streaming.channel.offset.migrationThis option is used to enable or disable streaming channel offset migration logic. When
true, offset tokens are migrated from V2 channel name format V2 to V1 channel name format. The V2 channel name format was used in Kafka Connector versions 2.1.0 and 2.1.1 only and is deprecated. V1 format name format is used unless V2 format is enabled usingsnowflake.streaming.channel.name.include.connector.name = true. Disabling this option might have side effects. Please consult Snowflake support before disabling this option.- Channel name formats:
V1 -
[topic]_[partition], used in all versions except 2.1.0 and 2.1.1V2 -
[connectorName]_[topic]_[partition], used in versions 2.1.0 and 2.1.1. Can be used in 3.4.0 and later — Please seesnowflake.streaming.channel.name.include.connector.name.
- 値:
truefalse
- デフォルト:
truefor versions from 2.1.2 until 3.4.0,falsefor version 3.4.0 and later
キーペア認証の使用およびキーローテーション¶
Kafkaコネクタは、基本認証(つまり、ユーザー名とパスワード)ではなく、キーペア認証に依存しています。この認証方法には、2048ビット(最小)の RSA キーペアが必要です。OpenSSLを使用して公開キーと秘密キーのペアを生成します。公開キーは、構成ファイルで定義されたSnowflakeユーザーに割り当てられます。
このページのキーペア認証の手順と キーペアのローテーション の手順を完了したら、 秘密の外部化 (このトピック内)の推奨事項を評価します。
公開/秘密キーペアを構成するには、
ターミナルウィンドウのコマンドラインから、秘密キーを生成します。
秘密キーの暗号化バージョンまたは非暗号化バージョンを生成できます。
注釈
Kafkaコネクタは、連邦情報処理標準(140-2)(つまり、 FIPS 140-2)要件を満たすように検証された暗号化アルゴリズムをサポートしています。詳細については、 FIPS 140-2 をご参照ください。
非暗号化バージョンを生成するには、次のコマンドを使用します。
$ openssl genrsa -out rsa_key.pem 2048
暗号化バージョンを生成するには、次のコマンドを使用します。
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
<アルゴリズム>は、 FIPS 140-2準拠の暗号化アルゴリズムです。たとえば、暗号化アルゴリズムとして AES 256を指定するには、
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
秘密キーの暗号化されたバージョンを生成する場合、パスフレーズを記録します。後で、Kafka構成ファイルの
snowflake.private.key.passphraseプロパティでパスフレーズを指定します。サンプル PEM 秘密キー
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1 wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL ... -----END ENCRYPTED PRIVATE KEY-----
コマンドラインから、秘密キーを参照して公開キーを生成します。
秘密キーが暗号化され、「
rsa_key.p8」という名前のファイルに含まれていると仮定して、次のコマンドを使用します。$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
サンプル PEM 公開キー
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4 zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk ... -----END PUBLIC KEY-----
公開キーファイルと秘密キーファイルを保存用のローカルディレクトリにコピーします。ファイルへのパスを記録します。秘密キーは PKCS#8(公開キー暗号化標準)形式を使用して格納され、前の手順で指定したパスフレーズを使用して暗号化されることに注意してください。ただし、オペレーティングシステムが提供するファイルアクセス権メカニズムを使用して、ファイルを不正アクセスから保護する必要があります。ファイルが使用されていない場合、ファイルを保護するのはユーザーの責任です。
Snowflakeにログインします。ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当てます。例:
ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
注釈
ユーザーを変更できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールのユーザー)以上のみです。
SQL ステートメントで公開キーのヘッダーとフッターを除外します。
DESCRIBE USER を使用してユーザーの公開キーの指紋を検証します。
DESC USER jsmith; +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+ | property | value | default | description | |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------| | NAME | JSMITH | null | Name | ... ... | RSA_PUBLIC_KEY_FP | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null | Fingerprint of user's RSA public key. | | RSA_PUBLIC_KEY_2_FP | null | null | Fingerprint of user's second RSA public key. | +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
注釈
RSA_PUBLIC_KEY_2_FPプロパティは、 キーペアローテーションの構成 で説明されています。秘密キー全体をコピーして、構成ファイルの
snowflake.private.keyフィールドに貼り付けます。ファイルを保存します。
外部 OAuthの使用¶
このコネクタは、 OAuth のセクションで説明されている外部 外部 OAuth の概要 をサポートしていますが、 Snowpipe Streaming でのみ動作します。
コネクタを構成するには、以下の構成プロパティを使用します。以下のすべてが必要です。
snowflake.authenticatoroauthを使用することを指定します。oauth値を使用します。snowflake.oauth.client.idOAuth アプリに関連付けられたクライアント ID 。
snowflake.oauth.client.secretOAuth アプリに関連付けられたクライアントシークレット。
snowflake.oauth.refresh.tokenアクセストークンとの交換に使用されるリフレッシュトークン。
snowflake.oauth.token.endpointアクセストークンとリフレッシュトークンの交換に使用されるエンドポイント。OAuth サーバの認証エンドポイントを指定する必要があります。
シークレットの外部化¶
Snowflakeは、秘密キーなどの秘密を外部化し、暗号化された形式、または AWS Key Management Service(KMS)、Microsoft Azure Key Vault、 HashiCorp Vaultなどのキー管理サービスで保存することを強くお勧めします。これは、Kafka Connectクラスターで ConfigProvider 実装を使用して実現できます。
詳細については、この サービス のConfluent説明をご参照ください。
Kafkaの開始¶
サードパーティのConfluentまたはApache Kafkaのドキュメントに記載されている手順を使用して、Kafkaを開始します。
Kafkaコネクタの開始¶
Kafkaコネクタは、分散モードまたはスタンドアロンモードで開始できます。それぞれの手順を以下に示します。
分散モード¶
ターミナルウィンドウから次のコマンドを実行します。
curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
スタンドアロンモード¶
ターミナルウィンドウから次のコマンドを実行します。
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
(Apache KafkaまたはConfluent Kafkaのデフォルトのインストールには、すでにファイル connect-standalone.properties が含まれているはずです。)
Kafkaコネクタのテストと使用¶
本番システムでコネクタを使用する前に、少量のデータでKafkaコネクタをテストすることをお勧めします。テストのプロセスは、コネクタを通常使用するプロセスと同じです。
KafkaおよびKafka Connectが実行されていることを確認します。
適切なKafkaトピックを作成したことを確認します。
メッセージパブリッシャーを作成(または既存のメッセージパブリッシャーを使用)します。トピックにパブリッシュされたメッセージが正しい形式(JSON、Avro、またはプレーンテキスト)であることを確認します。
サブスクライブするトピックと、書き込むSnowflakeテーブルを指定する構成ファイルを作成します。手順については、 Kafkaコネクタの設定 (このトピック内)をご参照ください。
(オプション)データを書き込むテーブルを作成します。この手順はオプションです。テーブルを作成しない場合、Kafkaコネクタがテーブルを作成します。コネクタを使用して既存の空でないテーブルにデータを追加する予定がない場合、スキーマの不一致の可能性を最小限に抑えるために、コネクタでテーブルを作成することをお勧めします。
Snowflakeオブジェクト(データベース、スキーマ、ターゲットテーブルなど)に必要な最小限の権限を、データの取り込みに使用されるロールに付与します。
設定されたKafkaトピックにデータのサンプルセットを公開します。
データがシステム全体に伝播するまで数分待ってから、Snowflakeテーブルをチェックして、記録が挿入されたことを確認します。
Tip
テスト環境および運用環境でSnowflakeにデータをロードする前に、 SnowCD を使用してSnowflakeへのネットワーク接続を確認することを検討してください。