Kafkaコネクタのインストールと構成¶
Kafkaコネクタは JAR (Java実行可能ファイル)として提供されます。
Snowflakeは、次の2つのバージョンのコネクタを提供します。
KafkaのConfluentパッケージバージョン のバージョン。
注釈
Kafkaコネクタは、 コネクタ規約 に従います。
このトピックの手順では、どちらかのバージョンのコネクタにのみ適用される手順を指定します。
このトピックの内容:
Snowflakeオブジェクトのアクセス制御の構成¶
必要な権限¶
Kafkaコネクタで使用されるSnowflakeオブジェクトを作成および管理するには、次の最小限の権限を持つロールが必要です。
オブジェクト |
権限 |
注意 |
---|---|---|
データベース |
USAGE |
|
スキーマ |
USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE |
スキーマレベルのオブジェクトが作成された後に、 CREATE |
テーブル |
OWNERSHIP |
Kafkaコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。 . コネクタがKafkaトピックの記録の新しいターゲットテーブルを作成する場合、Kafka構成ファイルで指定されたユーザーの既定のロールがテーブル所有者になります(つまり、テーブルに対する OWNERSHIP 権限があります)。 |
ステージ |
READ . WRITE |
Kafkaコネクタを使用してデータファイルをKafkaから 既存の 内部ステージにステージングする場合にのみ必要です(推奨されません)。 . コネクタがKafkaトピックから消費されたデータファイルを一時的に保存する新しいステージを作成する場合、Kafka構成ファイルで指定されたユーザーの既定のロールがステージ所有者になります(つまり、ステージに対する OWNERSHIP 権限があります)。 |
各Kafkaインスタンスに個別のユーザー( CREATE USER を使用)とロール( CREATE ROLE を使用)を作成して、必要に応じてアクセス権限を個別に取り消すことをSnowflakeはお勧めします。ロールは、ユーザーの既定のロールとして割り当てる必要があります。
Kafkaコネクタを使用するロールの作成¶
次のスクリプトは、Kafkaコネクタで使用するカスタムロールを作成します(例:KAFKA_CONNECTOR_ROLE_1)。権限を付与できるロール(例: SECURITYADMIN または MANAGE GRANTS 権限を持つロール)は、このカスタムロールを任意のユーザーに付与して、Kafkaコネクタが必要なSnowflakeオブジェクトを作成し、テーブルにデータを挿入できるようにします。スクリプトは、特定の 既存の データベースとスキーマ(kafka_db.kafka_schema
)およびユーザー(kafka_connector_user_1
)を参照します。
-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;
-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;
-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;
-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
カスタムロールとロール階層の作成の詳細については、 アクセス制御の構成 をご参照ください。
インストールの前提条件¶
Kafkaコネクタは、次のパッケージバージョンをサポートしています。
パッケージ
Snowflake Kafkaコネクタバージョン
パッケージのサポート(Snowflakeによりテスト済み)
Apache Kafka
2.0.0(またはそれ以上)
Apache Kafka 2.5.1、2.8.1、3.2.1
Confluent
2.0.0(またはそれ以上)
Confluent 6.2.6、7.2.1
Kafkaコネクタは、Kafka Connect API 3.2.3で使用するために構築されています。これよりも新しいバージョンのKafka Connect API はテストされていません。3.2.3よりも古いバージョンは、コネクタとの互換性があります。詳細については、 Kafkaの互換性 をご参照ください。
Kafkaコネクタと JDBC ドライバーのjarファイルの両方が環境にある場合、 JDBC バージョンが、目的のKafkaコネクタバージョンの
pom.xml
ファイルで指定されたsnowflake-jdbc
バージョンと一致していることを確認します。お好みのKafkaコネクタのリリースバージョン、例えば v2.0.1 に移動できます。次にpom.xml
ファイルをブラウズしてsnowflake-jdbc
のバージョンを調べます。データのインジェストにAvro形式を使用する場合:
https://mvnrepository.com/artifact/org.apache.avroから入手可能なAvroパーサー、バージョン1.8.2(またはそれ以上)を使用します。
Avroでスキーマレジストリ機能を使用する場合は、https://mvnrepository.com/artifact/io.confluentで入手可能なバージョン5.0.0(またはそれ以上)のKafka Connect Avro Converterを使用してください。
スキーマレジストリ機能は OSS Apache Kafkaパッケージでは使用できないことに注意してください。
希望のデータ保持時間やストレージ制限でKafkaを構成します。
Kafka Connectクラスターをインストールして構成します。
各Kafka Connectクラスタノードには、Kafkaコネクタに十分な RAM を含める必要があります。推奨される最小量は、Kafkaパーティションごとに5 MB です。これは、Kafka Connectが行っている他の作業に必要な RAM に追加する分です。
Kafka BrokerとKafka Connect Runtimeで同じバージョンを使用することをお勧めします。
Snowflakeアカウントと同じクラウドプロバイダー 地域 でKafka Connectインスタンスを実行することを強くお勧めします。これは厳密な要件ではありませんが、通常はスループットが向上します。
Snowflakeクライアントでサポートされているオペレーティングシステムのリストについては、 オペレーティングシステムのサポート をご参照ください。
コネクタのインストール¶
このセクションでは、Confluent用のKafkaコネクタのインストールと構成の手順を説明します。Kafkaコネクタのバージョンについては、次のテーブルをご参照ください。
リリースシリーズ |
ステータス |
注意 |
---|---|---|
2.x.x |
正式にサポート |
最新バージョンで、強くお勧めします。 |
1.9.x |
正式にサポート |
アップグレードをお勧めします。 |
1.8.x |
サポート対象外 |
このリリースシリーズは使用しないでください。 |
1.7.x |
サポート対象外 |
このリリースシリーズは使用しないでください。 |
Confluent用のコネクタのインストール¶
Kafkaコネクタファイルのダウンロード¶
次のいずれかの場所からKafkaコネクタ JAR ファイルをダウンロードします。
- Confluentハブ:
-
このパッケージには、キーペア認証に暗号化または非暗号化秘密キーを使用するために必要なすべての依存関係が含まれています。詳細については、このトピックの キーペア認証およびキーローテーションの使用 (このトピック内)をご参照ください。
- Maven Central Repository:
https://mvnrepository.com/artifact/com.snowflake
JAR ファイルは、キーペア認証に 暗号化されていない 秘密キーを使用するための追加の依存関係を必要としません。暗号化された秘密キーを使用するには、 Bouncy Castle 暗号化ライブラリ( JAR ファイル)をダウンロードします。SnowflakeはBouncy Castleを使用して、ログインに使用される暗号化された RSA 秘密キーを復号化します。
これらのファイルをKafkaコネクタ JAR ファイルと同じローカルフォルダーにダウンロードします。
コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。
Kafkaコネクタのインストール¶
他のコネクタをインストールするために提供されている手順を使用して、Kafkaコネクタをインストールします。
オープンソースApache Kafka用のコネクタのインストール¶
このセクションでは、オープンソースのApache Kafka用にKafkaコネクタをインストールおよび構成する手順について説明します。
Apache Kafkaのインストール¶
Kafkaパッケージを公式ウェブサイトhttps://kafka.apache.org/downloadsからダウンロードします。
ターミナルウィンドウで、パッケージファイルをダウンロードしたディレクトリに移動します。
次のコマンドを実行して、
kafka_<scalaバージョン>-<kafkaバージョン>.tgz
ファイルを解凍します。tar xzvf kafka_<scala_version>-<kafka_version>.tgz
JDK のインストール¶
Java Development Kit(JDK)をインストールして構成します。Snowflakeは、 JDKのStandard Edition(SE)でテストします。Enterprise Edition(EE)は互換性があると予想されますが、テストされていません。
この手順を既に完了している場合は、このセクションをスキップできます。
https://www.oracle.com/technetwork/java/javase/downloads/index.htmlから JDK をダウンロードします。
JDKをインストールまたは解凍します。
ご使用のオペレーティングシステムの手順に従って、環境変数 JAVA_HOME が JDKを含むディレクトリを指すように設定します。
Kafka Connector JAR ファイルをダウンロードします¶
Maven Central RepositoryからKafkaコネクタ JAR ファイルをダウンロードします。
JAR ファイルは、キーペア認証に 暗号化されていない 秘密キーを使用するための追加の依存関係を必要としません。暗号化された秘密キーを使用するには、 Bouncy Castle 暗号化ライブラリ( JAR ファイル)をダウンロードします。SnowflakeはBouncy Castleを使用して、ログインに使用される暗号化された RSA 秘密キーを復号化します。
Kafkaデータが Apache Avro 形式でストリーミングされている場合、Avro JAR ファイルをダウンロードします。
コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。
Kafkaコネクタのインストール¶
Kafkaコネクタ JAR ファイルのダウンロード でダウンロードした JAR ファイルを <kafkaディレクトリ>/libs
フォルダーにコピーします。
Kafkaコネクタの構成¶
コネクタは、Snowflakeログイン認証情報、トピック名、Snowflakeテーブル名などのパラメーターを指定するファイルを作成することで構成されます。
重要
Kafka Connectフレームワークは、Kafkaコネクタの構成設定をマスターノードからワーカーノードにブロードキャストします。構成設定には、機密情報(具体的にはSnowflakeユーザー名と秘密キー)が含まれます。Kafka Connectノード間の通信チャネルを必ず保護してください。手順については、Apache Kafkaソフトウェアのドキュメントをご参照ください。
各構成ファイルは、1つのデータベースとそのデータベース内の1つのスキーマのトピックと対応するテーブルを指定します。1つのコネクタは任意の数のトピックからメッセージを取り込むことができますが、対応するテーブルはすべて単一のデータベースとスキーマに保存される必要があります。
このセクションでは、分散モードとスタンドアロンモードの両方について説明します。
構成フィールドの説明については、 Kafka構成プロパティ をご参照ください。
重要
通常、構成ファイルには秘密キーなどのセキュリティ関連情報が含まれているため、ファイルに読み取り/書き込み権限を適切に設定してアクセスを制限してください。
さらに、構成ファイルを安全な外部の場所またはキー管理サービスに保存することを検討してください。詳細については、このトピックの 秘密の外部化 をご参照ください。
分散モード¶
Kafka構成ファイルを作成します(例: <パス>/<Configファイル>.json
)。すべてのコネクタ構成情報をファイルに入力します。ファイルは JSON 形式である必要があります。
サンプル構成ファイル
{
"name":"XYZCompanySensorData",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"topic1,topic2",
"snowflake.topic2table.map": "topic1:table1,topic2:table2",
"buffer.count.records":"10000",
"buffer.flush.time":"60",
"buffer.size.bytes":"5000000",
"snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.user.name":"jane.smith",
"snowflake.private.key":"xyz123",
"snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
"snowflake.database.name":"mydb",
"snowflake.schema.name":"myschema",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
}
}
スタンドアロンモード¶
<kafkaディレクトリ>/config/SF_connect.properties
などの構成ファイルを作成します。すべてのコネクタ構成情報をファイルに入力します。
サンプル構成ファイル
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Kafka構成プロパティ¶
次のプロパティは、分散モードまたはスタンドアロンモードのいずれかのKafka構成ファイルで設定できます。
必須のプロパティ¶
name
アプリケーション名。これは、顧客が使用するすべてのKafkaコネクタで一意でなければなりません。この名前は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効な識別子については、 識別子の要件 をご参照ください。
connector.class
com.snowflake.kafka.connector.SnowflakeSinkConnector
topics
トピックのコンマ区切りリスト。デフォルトでは、Snowflakeはテーブル名がトピック名と同じであると想定します。テーブル名がトピック名と同じでない場合は、オプションの
topic2table.map
パラメーター(下記)を使用して、トピック名からテーブル名へのマッピングを指定します。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。注釈
両方ではなく、
topics
またはtopics.regex
の いずれか が必要です。topics.regex
これは、Snowflakeテーブルに読み込むメッセージを含むトピックを指定する正規表現(「regex」)です。コネクタは、正規表現に一致するトピック名からデータをロードします。正規表現は、Java正規表現のルールに従う必要があります(つまり、java.util.regex.Patternと互換性があります)。構成ファイルには、両方ではなく、
topics
またはtopics.regex
の いずれか を含める必要があります。snowflake.url.name
Snowflakeアカウントにアクセスするための URL。この URL には、使用する アカウント識別子 が含まれている必要があります。プロトコル(
https://
)とポート番号はオプションです。snowflake.user.name
Snowflakeアカウントのユーザーログイン名。
snowflake.private.key
ユーザーを認証するための秘密キー。ヘッダーまたはフッターではなく、キーのみを含めます。キーが複数の行に分割されている場合、改行を削除します。暗号化されていないキーを提供するか、暗号化されたキーを提供して
snowflake.private.key.passphrase
パラメーターを提供し、Snowflakeがキーを復号化できるようにします。snowflake.private.key
パラメーター値が暗号化されている 時に限り 、このパラメーターを使用します。これにより、 キーペア認証およびキーローテーションの使用 (このトピック内)の指示に従って暗号化された秘密キーが復号化されます。注釈
このトピックの オプションのプロパティ の
snowflake.private.key.passphrase
もご参照ください。snowflake.database.name
行を挿入するテーブルを含むデータベースの名前。
snowflake.schema.name
行を挿入するテーブルを含むスキーマの名前。
header.converter
記録がAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要です。値は
"org.apache.kafka.connect.storage.StringConverter"
です。key.converter
これは、Kafkaの記録のキーコンバーターです(例:
"org.apache.kafka.connect.storage.StringConverter"
)。これはKafkaコネクタでは使用されませんが、Kafka Connectプラットフォームでは必要です。現在の制限については Kafkaコネクタの制限 をご参照ください。
value.converter
記録が JSONでフォーマットされている場合、これは
"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
になります。記録がAvroでフォーマットされ、Kafkaのスキーマレジストリサービスを使用する場合、これは
"com.snowflake.kafka.connector.records.SnowflakeAvroConverter"
になります。記録がAvroでフォーマットされ、スキーマを含む(したがって、Kafkaのスキーマレジストリサービスを必要としない)場合、これは
"com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry"
になります。記録がプレーンテキストでフォーマットされている場合、これは
"org.apache.kafka.connect.storage.StringConverter"
になります。現在の制限については Kafkaコネクタの制限 をご参照ください。
オプションのプロパティ¶
snowflake.private.key.passphrase
このパラメーターの値が空でない場合、Kafkaはこのフレーズを使用して秘密キーの復号化を試みます。
tasks.max
タスクの数。通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じです。この数は、低くまたは高く設定できます。ただし、Snowflakeは高く設定することをお勧めしません。
snowflake.topic2table.map
このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。
buffer.count.records
Snowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。デフォルト値は
10000
記録です。buffer.flush.time
バッファーフラッシュ間の秒数。フラッシュはKafkaのメモリキャッシュから内部ステージまでです。デフォルト値は
120
秒です。buffer.size.bytes
データファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされた記録の累積サイズ(バイト単位)。このデフォルト値は
5000000
(5 MB)です。記録は、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内の記録のサイズは、記録から作成されたデータファイルのサイズよりも大きくなる可能性があります。
value.converter.schema.registry.url
形式がAvroで、スキーマレジストリサービスを使用している場合、これはスキーマレジストリサービスの URL である必要があります。それ以外の場合、このフィールドは空でなければなりません。
value.converter.break.on.schema.registry.error
Schema Registry ServiceからAvroデータをロードする場合、このプロパティは、スキーマIDのフェッチ中にエラーが発生した場合にKafkaコネクタが記録の消費を停止するかどうかを決定します。デフォルト値は
false
です。この動作を有効にするには、値をtrue
に設定します。Kafkaコネクタバージョン1.4.2以降でサポートされています。
jvm.proxy.host
Snowflake Kafka Connectorがプロキシサーバー経由でSnowflakeにアクセスできるようにするには、このパラメーターを設定して、そのプロキシサーバーのホストを指定します。
jvm.proxy.port
Snowflake Kafka Connectorがプロキシサーバーを介してSnowflakeにアクセスできるようにするには、このパラメーターを設定してそのプロキシサーバーのポートを指定します。
jvm.proxy.username
プロキシサーバーで認証するユーザー名。
Kafkaコネクタバージョン1.4.4以降でサポートされています。
jvm.proxy.password
プロキシサーバーで認証するユーザー名のパスワード。
Kafkaコネクタバージョン1.4.4以降でサポートされています。
value.converter.basic.auth.credentials.source
Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合、このパラメーターを文字列「USER_INFO」に設定し、以下で説明する
value.converter.basic.auth.user.info
パラメーターを設定します。それ以外の場合は、このパラメーターを省略します。value.converter.basic.auth.user.info
Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合は、上記のとおりこのパラメーターを文字列「<user_ID>:<password>」に設定し、value.converter.basic.auth.credentials.sourceパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。
snowflake.metadata.createtime
値が FALSE に設定されている場合、
CreateTime
プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
snowflake.metadata.topic
値が FALSE に設定されている場合、
topic
プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
snowflake.metadata.offset.and.partition
値が FALSE に設定されている場合、
Offset
およびPartition
プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
snowflake.metadata.all
値が FALSE に設定されている場合、 RECORD_METADATA 列のメタデータは完全に空です。デフォルト値は TRUE です。
Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。
transforms
Kafkaコネクタが検出したtombstone記録をスキップし、それらをターゲットテーブルにロードしないように指定します。tombstone記録は、値フィールド全体がnullである記録として定義されます。
プロパティ値を
"tombstoneHandlerExample"
に設定します。注釈
このプロパティは、Kafkaコミュニティコンバーター(つまり、
value.converter
プロパティ値)でのみ使用します(例:org.apache.kafka.connect.json.JsonConverter
またはorg.apache.kafka.connect.json.AvroConverter
)。Snowflakeコンバーターでtombstone記録の処理を管理するには、代わりにbehavior.on.null.values
プロパティを使用します。transforms.tombstoneHandlerExample.type
transforms
プロパティを設定するときに必要です。プロパティ値を
"io.confluent.connect.transforms.TombstoneHandler"
に設定behavior.on.null.values
Kafkaコネクタがtombstone記録を処理する方法を指定します。tombstone記録は、値フィールド全体がnullである記録として定義されます。 Snowpipe では、このプロパティは Kafkaコネクタのバージョン1.5.5以降でサポートされています。 Snowpipeストリーミング では、このプロパティはKafkaコネクタのバージョン2.1.0以降でサポートされています。
このプロパティは、次の値をサポートします。
DEFAULT
Kafkaコネクタがtombstone記録を検出すると、コンテンツ列に空の JSON 文字列を挿入します。
IGNORE
Kafkaコネクタはtombstone記録をスキップし、これらの記録の行を挿入しません。
デフォルト値は
DEFAULT
です。注釈
Tombstone記録のインジェスチョンは、インジェスチョンメソッドによって異なります。
Snowpipeでは、KafkaコネクタはSnowflakeコンバーターのみを使用します。Kafkaコミュニティコンバーターを使用してtombstone記録の処理を管理するには、代わりに
transform
プロパティとtransforms.tombstoneHandlerExample.type
プロパティを使用します。Snowpipe Streamingでは、Kafkaコネクタはコミュニティコンバーターのみを使用します。
Kafkaブローカーに送信された記録は、Kafkaコネクタによって削除され、オフセットが欠落するため、 NULL であってはなりません。オフセットの欠落は、特定のユースケースにおいてKafkaコネクタを破壊します。 NULL の記録ではなく、tombstoneの記録を使用することをお勧めします。
キーペア認証の使用およびキーローテーション¶
Kafkaコネクタは、基本認証(つまり、ユーザー名とパスワード)ではなく、キーペア認証に依存しています。この認証方法には、2048ビット(最小)の RSA キーペアが必要です。 OpenSSLを使用して公開キーと秘密キーのペアを生成します。公開キーは、構成ファイルで定義されたSnowflakeユーザーに割り当てられます。
このページのキーペア認証の手順と キーペアのローテーション の手順を完了したら、 秘密の外部化 (このトピック内)の推奨事項を評価します。
公開/秘密キーペアを構成するには、
ターミナルウィンドウのコマンドラインから、秘密キーを生成します。
秘密キーの暗号化バージョンまたは非暗号化バージョンを生成できます。
注釈
Kafkaコネクタは、連邦情報処理標準(140-2)(つまり、 FIPS 140-2)要件を満たすように検証された暗号化アルゴリズムをサポートしています。詳細については、 FIPS 140-2 をご参照ください。
非暗号化バージョンを生成するには、次のコマンドを使用します。
$ openssl genrsa -out rsa_key.pem 2048
暗号化バージョンを生成するには、次のコマンドを使用します。
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
<アルゴリズム>
は、 FIPS 140-2準拠の暗号化アルゴリズムです。たとえば、暗号化アルゴリズムとして AES 256を指定するには、
$ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
秘密キーの暗号化されたバージョンを生成する場合、パスフレーズを記録します。後で、Kafka構成ファイルの
snowflake.private.key.passphrase
プロパティでパスフレーズを指定します。サンプル PEM 秘密キー
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1 wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL ... -----END ENCRYPTED PRIVATE KEY-----
コマンドラインから、秘密キーを参照して公開キーを生成します。
秘密キーが暗号化され、「
rsa_key.p8
」という名前のファイルに含まれていると仮定して、次のコマンドを使用します。$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
サンプル PEM 公開キー
-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4 zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk ... -----END PUBLIC KEY-----
公開キーファイルと秘密キーファイルを保存用のローカルディレクトリにコピーします。ファイルへのパスを記録します。秘密キーは PKCS#8(公開キー暗号化標準)形式を使用して格納され、前の手順で指定したパスフレーズを使用して暗号化されることに注意してください。ただし、オペレーティングシステムが提供するファイル許可メカニズムを使用して、ファイルを不正アクセスから保護する必要があります。ファイルが使用されていない場合、ファイルを保護するのはユーザーの責任です。
Snowflakeにログインします。 ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当てます。例:
ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
注釈
ユーザーを変更できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールのユーザー)以上のみです。
SQL ステートメントで公開キーのヘッダーとフッターを除外します。
DESCRIBE USER を使用してユーザーの公開キーの指紋を検証します。
DESC USER jsmith; +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+ | property | value | default | description | |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------| | NAME | JSMITH | null | Name | ... ... | RSA_PUBLIC_KEY_FP | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null | Fingerprint of user's RSA public key. | | RSA_PUBLIC_KEY_2_FP | null | null | Fingerprint of user's second RSA public key. | +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
注釈
RSA_PUBLIC_KEY_2_FP
プロパティは、 キーペアローテーションの構成 で説明されています。秘密キー全体をコピーして、構成ファイルの
snowflake.private.key
フィールドに貼り付けます。ファイルを保存します。
シークレットの外部化¶
Snowflakeは、秘密キーなどの秘密を外部化し、暗号化された形式、または AWS Key Management Service(KMS)、Microsoft Azure Key Vault、 HashiCorp Vaultなどのキー管理サービスで保存することを強くお勧めします。これは、Kafka Connectクラスターで ConfigProvider
実装を使用して実現できます。
詳細については、この サービス のConfluent説明をご参照ください。
Kafkaの開始¶
サードパーティのConfluentまたはApache Kafkaのドキュメントに記載されている手順を使用して、Kafkaを開始します。
Kafkaコネクタの開始¶
Kafkaコネクタは、分散モードまたはスタンドアロンモードで開始できます。それぞれの手順を以下に示します。
分散モード¶
ターミナルウィンドウから次のコマンドを実行します。
curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
スタンドアロンモード¶
ターミナルウィンドウから次のコマンドを実行します。
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
(Apache KafkaまたはConfluent Kafkaのデフォルトのインストールには、すでにファイル connect-standalone.properties
が含まれているはずです。)
Kafkaコネクタのテストと使用¶
本番システムでコネクタを使用する前に、少量のデータでKafkaコネクタをテストすることをお勧めします。テストのプロセスは、コネクタを通常使用するプロセスと同じです。
KafkaおよびKafka Connectが実行されていることを確認します。
適切なKafkaトピックを作成したことを確認します。
メッセージパブリッシャーを作成(または既存のメッセージパブリッシャーを使用)します。トピックにパブリッシュされたメッセージが正しい形式(JSON、Avro、またはプレーンテキスト)であることを確認します。
サブスクライブするトピックと、書き込むSnowflakeテーブルを指定する構成ファイルを作成します。手順については、 Kafkaコネクタの設定 (このトピック内)をご参照ください。
(オプション)データを書き込むテーブルを作成します。この手順はオプションです。テーブルを作成しない場合、Kafkaコネクタがテーブルを作成します。コネクタを使用して既存の空でないテーブルにデータを追加する予定がない場合、スキーマの不一致の可能性を最小限に抑えるために、コネクタでテーブルを作成することをお勧めします。
Snowflakeオブジェクト(データベース、スキーマ、ターゲットテーブルなど)に必要な最小限の権限を、データの取り込みに使用されるロールに付与します。
設定されたKafkaトピックにデータのサンプルセットを公開します。
データがシステム全体に伝播するまで数分待ってから、Snowflakeテーブルをチェックして、記録が挿入されたことを確認します。
Tip
テスト環境および運用環境でSnowflakeにデータをロードする前に、 SnowCD を使用してSnowflakeへのネットワーク接続を確認することを検討してください。