Kafkaコネクタのインストールと構成

Kafkaコネクタは JAR (Java実行可能ファイル)として提供されます。

Snowflakeは、次の2つのバージョンのコネクタを提供します。

  • Kafkaの Confluent パッケージバージョン。

  • オープンソースソフトウェア(OSS)用のApache Kafkaパッケージバージョン。

このトピックの手順では、どちらかのバージョンのコネクタにのみ適用される手順を指定します。

このトピックの内容:

Snowflakeオブジェクトのアクセス制御の構成

必要な権限

Kafkaコネクタで使用されるSnowflakeオブジェクトを作成および管理するには、次の最小限の権限を持つロールが必要です。

オブジェクト

権限

注意

データベース

USAGE

スキーマ

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

スキーマレベルのオブジェクトが作成された後、 CREATE オブジェクト 権限を取り消すことができます。

テーブル

INSERT . SELECT

Kafkaコネクタを使用して 既存の テーブルにデータを取り込む場合にのみ必要です。 . コネクタがKafkaトピックのレコードの新しいターゲットテーブルを作成する場合、Kafka構成ファイルで指定されたユーザーの既定のロールがテーブル所有者になります(つまり、テーブルに対する OWNERSHIP 権限があります)。

ステージ

READ . WRITE

Kafkaコネクタを使用してデータファイルをKafkaから 既存の 内部ステージにステージングする場合にのみ必要です(推奨されません)。 . コネクタがKafkaトピックから消費されたデータファイルを一時的に保存する新しいステージを作成する場合、Kafka構成ファイルで指定されたユーザーの既定のロールがステージ所有者になります(つまり、ステージに対する OWNERSHIP 権限があります)。

各Kafkaインスタンスに個別のユーザー( CREATE USER を使用)とロール( CREATE ROLE を使用)を作成して、必要に応じてアクセス権限を個別に取り消すことをSnowflakeはお勧めします。ロールは、ユーザーの既定のロールとして割り当てる必要があります。

Kafkaコネクタを使用するロールの作成

次のスクリプトは、Kafkaコネクタで使用するカスタムロールを作成します(例:KAFKA_CONNECTOR_ROLE_1)。権限を付与できるロール(例: SECURITYADMIN または MANAGE GRANTS 権限を持つロール)は、このカスタムロールを任意のユーザーに付与して、Kafkaコネクタが必要なSnowflakeオブジェクトを作成し、テーブルにデータを挿入できるようにします。スクリプトは、特定の 既存の データベースとスキーマ(kafka_db.kafka_schema)およびユーザー(kafka_connector_user_1)を参照します。

-- Use a role that can create and manage roles and privileges:
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database:
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema:
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table:
GRANT SELECT, INSERT ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended):
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user:
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user:
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;

カスタムロールとロール階層の作成の詳細については、 アクセス制御の構成 をご参照ください。

インストールの前提条件

  • Kafkaコネクタは、次のパッケージバージョンをサポートしています。

    パッケージ

    Snowflake Kafkaコネクタバージョン

    パッケージサポート

    Apache Kafka

    0.5.3 ~ 1.2.2

    Apache Kafka 2.0.x ~ 2.4.x

    1.2.3(またはそれ以上)

    Apache Kafka 2.0.x ~ 2.5.x

    Confluent

    0.5.3 ~ 1.2.2

    Confluent 5.0.x ~ 5.4.x

    1.2.3(またはそれ以上)

    Confluent 5.0.x ~ 5.5.x

  • Kafkaコネクタは、次のKafka Connect API 2.0.0で使用するために構築されています。2.0.0から2.3.0までのKafka Connect API バージョンを使用することを強くお勧めします。以前のバージョンはコネクタと互換性がなく、新しいバージョンはテストされていません。

  • データの取り込みにAvro形式を使用する場合、

    • https://mvnrepository.com/artifact/org.apache.avroから入手可能なAvroパーサー、バージョン1.8.2(またはそれ以上)を使用します。

    • Avroでスキーマレジストリ機能を使用する場合は、https://mvnrepository.com/artifact/io.confluentで入手可能なバージョン5.0.0(またはそれ以上)のKafka Connect Avro Converterを使用してください。

      スキーマレジストリ機能は OSS Apache Kafkaパッケージでは使用できないことに注意してください。

  • 希望のデータ保持時間やストレージ制限でKafkaを構成します。

  • Kafka Connectクラスターをインストールして構成します。

    各Kafka Connectクラスタノードには、Kafkaコネクタに十分な RAM を含める必要があります。推奨される最小量は、Kafkaパーティションごとに5 MB です。これは、Kafka Connectが行っている他の作業に必要な RAM に追加する分です。

  • Snowflakeアカウントと同じクラウドプロバイダー 地域 でKafka Connectインスタンスを実行することを強くお勧めします。これは厳密な要件ではありませんが、通常はスループットが向上します。

Snowflakeクライアントでサポートされているオペレーティングシステムのリストについては、 オペレーティングシステムのサポート をご参照ください。

コネクタのインストール

このセクションでは、Confluent用のKafkaコネクタのインストールと構成の手順を説明します。

Confluent用のコネクタのインストール

Kafkaコネクタファイルのダウンロード

次のいずれかの場所からKafkaコネクタ JAR ファイルをダウンロードします。

Confluentハブ

https://www.confluent.io/hub/

このパッケージには、キーペア認証に暗号化または非暗号化プライベートキーを使用するために必要なすべての依存関係が含まれています。詳細については、このトピックの キーペア認証の使用 をご参照ください。

Maven Central Repository

https://mvnrepository.com/artifact/com.snowflake

JAR ファイルは、キーペア認証に 暗号化されていない 秘密キーを使用するための追加の依存関係を必要としません。暗号化された秘密キーを使用するには、 Bouncy Castle 暗号化ライブラリ( JAR ファイル)をダウンロードします。SnowflakeはBouncy Castleを使用して、ログインに使用される暗号化された RSA 秘密キーを復号化します。

これらのファイルをKafkaコネクタ JAR ファイルと同じローカルフォルダーにダウンロードします。

コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。

Kafkaコネクタのインストール

他のコネクタをインストールするために提供されている手順を使用して、Kafkaコネクタをインストールします。

オープンソースApache Kafka用のコネクタのインストール

このセクションでは、オープンソースのApache Kafka用にKafkaコネクタをインストールおよび構成する手順について説明します。

Apache Kafkaのインストール

  1. Kafkaパッケージを公式ウェブサイトhttps://kafka.apache.org/downloadsからダウンロードします

  2. ターミナルウィンドウで、パッケージファイルをダウンロードしたディレクトリに移動します。

  3. 次のコマンドを実行して、 kafka_<scalaバージョン>-<kafkaバージョン>.tgz ファイルを解凍します。

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    

JDK のインストール

Java Development Kit(JDK)をインストールして構成します。Snowflakeは、 JDKのStandard Edition(SE)でテストします。Enterprise Edition(EE)は互換性があると予想されますが、テストされていません。

この手順を既に完了している場合は、このセクションをスキップできます。

  1. https://www.oracle.com/technetwork/java/javase/downloads/index.htmlから JDK をダウンロードします。

  2. JDKをインストールまたは解凍します。

  3. ご使用のオペレーティングシステムの手順に従って、環境変数 JAVA_HOME が JDKを含むディレクトリを指すように設定します。

Kafka Connector JAR ファイルをダウンロードします

  1. Maven Central RepositoryからKafkaコネクタ JAR ファイルをダウンロードします。

    https://mvnrepository.com/artifact/com.snowflake

  2. JAR ファイルは、キーペア認証に 暗号化されていない 秘密キーを使用するための追加の依存関係を必要としません。暗号化された秘密キーを使用するには、 Bouncy Castle 暗号化ライブラリ( JAR ファイル)をダウンロードします。SnowflakeはBouncy Castleを使用して、ログインに使用される暗号化された RSA 秘密キーを復号化します。

  3. Kafkaデータが Apache Avro 形式でストリーミングされている場合、Avro JAR ファイルをダウンロードします。

    https://mvnrepository.com/artifact/org.apache.avro/avro

コネクタのソースコードはhttps://github.com/snowflakedb/snowflake-kafka-connectorから入手できます。

Kafkaコネクタのインストール

Kafkaコネクタ JAR ファイルのダウンロード でダウンロードした JAR ファイルを <kafkaディレクトリ>/libs フォルダーにコピーします。

Kafkaコネクタの構成

コネクタは、Snowflakeログイン認証情報、トピック名、Snowflakeテーブル名などのパラメーターを指定するファイルを作成することで構成されます。

重要

Kafka Connectフレームワークは、Kafkaコネクタの構成設定をマスターノードからワーカーノードにブロードキャストします。構成設定には、機密情報(具体的にはSnowflakeユーザー名と秘密キー)が含まれます。Kafka Connectノード間の通信チャネルを必ず保護してください。手順については、Apache Kafkaソフトウェアのドキュメントをご参照ください。

各構成ファイルは、1つのデータベースとそのデータベース内の1つのスキーマのトピックと対応するテーブルを指定します。1つのコネクタは任意の数のトピックからメッセージを取り込むことができますが、対応するテーブルはすべて単一のデータベースとスキーマに保存される必要があります。

このセクションでは、分散モードとスタンドアロンモードの両方について説明します。

構成フィールドの説明については、 Kafka構成プロパティ をご参照ください。

重要

通常、構成ファイルには秘密キーなどのセキュリティ関連情報が含まれているため、ファイルに読み取り/書き込み権限を適切に設定してアクセスを制限してください。

さらに、構成ファイルを安全な外部の場所またはキー管理サービスに保存することを検討してください。詳細については、このトピックの 秘密の外部化 をご参照ください。

分散モード

<kafkaディレクトリ>/config/connect-distributed.properties などのKafka構成ファイルを作成します。すべてのコネクタ構成情報をファイルに入力します。

サンプル構成ファイル

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myaccount.us-west-2.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}

スタンドアロンモード

<kafkaディレクトリ>/config/connect-standalone.properties などの構成ファイルを作成します。すべてのコネクタ構成情報をファイルに入力します。

サンプル構成ファイル

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myaccount.us-west-2.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword

Kafka構成プロパティ

次のプロパティは、Kafka構成ファイルで設定できます。

必須のプロパティ

name

アプリケーション名。これは、顧客が使用するすべてのKafkaコネクタで一意でなければなりません。この名前は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効な識別子については、 識別子の要件 をご参照ください。

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

トピックのコンマ区切りリスト。デフォルトでは、Snowflakeはテーブル名がトピック名と同じであると想定します。テーブル名がトピック名と同じでない場合は、オプションの topic2table.map パラメーター(下記)を使用して、トピック名からテーブル名へのマッピングを指定します。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。

注釈

両方ではなく、 topics または topics.regexいずれか が必要です。

topics.regex

これは、Snowflakeテーブルに読み込むメッセージを含むトピックを指定する正規表現(「regex」)です。コネクタは、正規表現に一致するトピック名からデータをロードします。正規表現は、Java正規表現のルールに従う必要があります(つまり、java.util.regex.Patternと互換性があります)。構成ファイルには、両方ではなく、 topics または topics.regexいずれか を含める必要があります。

snowflake.url.name

https://<アカウント名>.<地域ID>.snowflakecomputing.com:443 形式の、Snowflakeアカウントにアクセスするための URL。 https:// とポート番号はオプションです。アカウントが AWS US 西部地域にあり、 AWS PrivateLink を使用していない場合、地域 ID は使用されません。Snowflakeアカウント名と地域名の詳細については、 サポートされている地域 をご参照ください。

snowflake.user.name

Snowflakeアカウントのユーザーログイン名。

snowflake.private.key

ユーザーを認証するための秘密キー。ヘッダーまたはフッターではなく、キーのみを含めます。キーが複数の行に分割されている場合、改行を削除します。暗号化されていないキーを提供するか、暗号化されたキーを提供して snowflake.private.key.passphrase パラメーターを提供し、Snowflakeがキーを復号化できるようにします。 snowflake.private.key パラメーター値が暗号化されている 場合のみ 、このパラメーターを使用します。これにより、 キーペア認証の使用 (このトピック内)の指示に従って暗号化された秘密キーが復号化されます。

注釈

このトピックの オプションのプロパティsnowflake.private.key.passphrase もご参照ください。

snowflake.database.name

行を挿入するテーブルを含むデータベースの名前。

snowflake.schema.name

行を挿入するテーブルを含むスキーマの名前。

header.converter

レコードがAvroでフォーマットされ、ヘッダーが含まれている場合にのみ必要です。値は "org.apache.kafka.connect.storage.StringConverter" です。

key.converter

これは、Kafkaの記録のキーコンバーターです(例: "org.apache.kafka.connect.storage.StringConverter")。これはKafkaコネクタでは使用されませんが、Kafka Connectプラットフォームでは必要です。

現在の制限については Kafkaコネクタの制限 を参照してください。

value.converter

レコードが JSONでフォーマットされている場合、これは "com.snowflake.kafka.connector.records.SnowflakeJsonConverter" になります。

レコードがAvroでフォーマットされ、Kafkaのスキーマレジストリサービスを使用する場合、これは "com.snowflake.kafka.connector.records.SnowflakeAvroConverter" になります。

レコードがAvroでフォーマットされ、スキーマを含む(したがって、Kafkaのスキーマレジストリサービスを必要としない)場合、これは "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry" になります。

現在の制限については Kafkaコネクタの制限 を参照してください。

オプションのプロパティ

snowflake.private.key.passphrase

このパラメーターの値が空でない場合、Kafkaはこのフレーズを使用して秘密キーの復号化を試みます。

tasks.max

タスクの数。通常、Kafka Connectクラスタのワーカーノード全体の CPU コアの数と同じです。この数は、低くまたは高く設定できます。ただし、Snowflakeは高く設定することをお勧めしません。

snowflake.topic2table.map

このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。有効なテーブル名については、 識別子の要件 をご参照ください。

buffer.count.records

Snowflakeにインジェストされる前に、Kafkaパーティションごとにメモリにバッファされる記録の数。デフォルト値は 10000 レコードです。

buffer.flush.time

バッファーフラッシュ間の秒数。フラッシュはKafkaのメモリキャッシュから内部ステージまでです。デフォルト値は 120 秒です。

buffer.size.bytes

データファイルとしてSnowflakeに取り込まれる前に、Kafkaパーティションごとにメモリーにバッファーされたレコードの累積サイズ(バイト単位)。このデフォルト値は 5000000 (5 MB)です。

レコードは、データファイルに書き込まれるときに圧縮されます。その結果、バッファー内のレコードのサイズは、レコードから作成されたデータファイルのサイズよりも大きくなる可能性があります。

value.converter.schema.registry.url

形式がAvroで、スキーマレジストリサービスを使用している場合、これはスキーマレジストリサービスの URL である必要があります。それ以外の場合、このフィールドは空でなければなりません。

value.converter.break.on.schema.registry.error

Schema Registry ServiceからAvroデータをロードする場合、このプロパティは、スキーマIDのフェッチ中にエラーが発生した場合にKafkaコネクタがレコードの消費を停止するかどうかを決定します。デフォルト値は false です。この動作を有効にするには、値を true に設定します。

Kafkaコネクタバージョン1.4.2以降でサポートされています。

jvm.proxy.host

Snowflake Kafka Connectorがプロキシサーバー経由でSnowflakeにアクセスできるようにするには、このパラメーターを設定して、そのプロキシサーバーのホストを指定します。

jvm.proxy.port

Snowflake Kafka Connectorがプロキシサーバーを介してSnowflakeにアクセスできるようにするには、このパラメーターを設定してそのプロキシサーバーのポートを指定します。

jvm.proxy.username

プロキシサーバーで認証するユーザー名。

Kafkaコネクタバージョン1.4.4以降でサポートされています。

jvm.proxy.password

プロキシサーバーで認証するユーザー名のパスワード。

Kafkaコネクタバージョン1.4.4以降でサポートされています。

value.converter.basic.auth.credentials.source

Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合、このパラメーターを文字列「USER_INFO」に設定し、以下で説明する value.converter.basic.auth.user.info パラメーターを設定します。それ以外の場合は、このパラメーターを省略します。

value.converter.basic.auth.user.info

Avroデータ形式を使用していて、Kafkaスキーマレジストリへの安全なアクセスが必要な場合は、上記のとおりこのパラメーターを文字列「<user_ID>:<password>」に設定し、value.converter.basic.auth.credentials.sourceパラメーターを設定します。それ以外の場合は、このパラメーターを省略します。

snowflake.metadata.createtime

値が FALSE に設定されている場合、 CreateTime プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。

snowflake.metadata.topic

値が FALSE に設定されている場合、 topic プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。

snowflake.metadata.offset.and.partition

値が FALSE に設定されている場合、 Offset および Partition プロパティ値は RECORD_METADATA 列のメタデータから省略されます。デフォルト値は TRUE です。

Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。

snowflake.metadata.all

値が FALSE に設定されている場合、 RECORD_METADATA 列のメタデータは完全に空です。デフォルト値は TRUE です。

Kafkaコネクター1.2.0(およびそれ以上)でサポートされています。

キーペア認証の使用

Kafkaコネクタは、一般的なユーザー名/パスワード認証ではなく、キーペア認証に依存しています。この認証方法には、2048ビット(最小)の RSA キーペアが必要です。 OpenSSLを使用して公開キーと秘密キーのペアを生成します。公開キーは、構成ファイルで定義されたSnowflakeユーザーに割り当てられます。

公開/秘密キーペアを構成するには:

  1. ターミナルウィンドウのコマンドラインから、秘密キーを生成します。

    秘密キーの暗号化バージョンまたは非暗号化バージョンを生成できます。

    注釈

    Kafkaコネクタは、連邦情報処理標準(140-2)(FIPS 140-2)要件を満たすように検証された暗号化アルゴリズムをサポートしています。 FIPS 140-2の詳細については、https://csrc.nist.gov/publications/detail/fips/140/2/finalをご覧参照ください

    非暗号化バージョンを生成するには、次のコマンドを使用します。

    $ openssl genrsa -out rsa_key.pem 2048
    

    暗号化バージョンを生成するには、次のコマンドを使用します。

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    

    <アルゴリズム> は、 FIPS 140-2準拠の暗号化アルゴリズムです。

    たとえば、暗号化アルゴリズムとして AES 256を指定するには、

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    

    秘密キーの暗号化されたバージョンを生成する場合、パスフレーズを記録します。後で、Kafka構成ファイルの snowflake.private.key.passphrase プロパティでパスフレーズを指定します。

    サンプル PEM 秘密キー

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
  2. コマンドラインから、秘密キーを参照して公開キーを生成します。

    秘密キーが暗号化され、「 rsa_key.p8 」という名前のファイルに含まれていると仮定して、次のコマンドを使用します。

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    

    サンプル PEM 公開キー

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
  3. 公開キーファイルと秘密キーファイルを保存用のローカルディレクトリにコピーします。ファイルへのパスを記録します。秘密キーは PKCS#8(公開キー暗号化標準)形式を使用して格納され、前の手順で指定したパスフレーズを使用して暗号化されることに注意してください。ただし、オペレーティングシステムが提供するファイル許可メカニズムを使用して、ファイルを不正アクセスから保護する必要があります。ファイルが使用されていない場合、ファイルを保護するのはユーザーの責任です。

  4. Snowflakeにログインします。 ALTER USER を使用して、Snowflakeユーザーに公開キーを割り当てます。例:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    

    注釈

    • ユーザーを変更できるのは、セキュリティ管理者(つまり、 SECURITYADMIN ロールのユーザー)以上のみです。

    • SQL ステートメントで公開キーのヘッダーとフッターを除外します。

    DESCRIBE USER を使用してユーザーの公開キーの指紋を検証します:

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    

    注釈

    RSA_PUBLIC_KEY_2_FP プロパティについては、このトピックの キーローテーション で説明しています。

  5. 秘密キー全体をコピーして、構成ファイルの snowflake.private.key フィールドに貼り付けます。ファイルを保存します。

秘密の外部化

Snowflakeは、秘密キーなどの秘密を外部化し、暗号化された形式、または AWS Key Management Service(KMS)、Microsoft Azure Key Vault、 HashiCorp Vaultなどのキー管理サービスで保存することを強くお勧めします。これは、Kafka Connectクラスターで ConfigProvider 実装を使用して実現できます。

詳細については、Confluentのこのサービスの説明ttps://docs.confluent.io/current/connect/security.html#externalizing-secretsをご参照ください。

キーローテーション

Snowflakeは、複数のアクティブキーをサポートして、連続したローテーションを可能にします。内部的に従う有効期限のスケジュールに基づいて、公開キーと秘密キーをローテーションして交換します。

現在、 ALTER USERRSA_PUBLIC_KEY および RSA_PUBLIC_KEY_2 パラメーターを使用して、最大2個の公開キーを1人のユーザーに関連付けることができます。

キーをローテーションするには:

  1. キーペア認証の使用 の手順を完了して:

    • 新しい秘密キーと公開キーのセットを生成します。

    • ユーザーに公開キーを割り当てます。公開キーの値を RSA_PUBLIC_KEY または RSA_PUBLIC_KEY_2 (現在使用されていないキーの値)に設定します。例:

      alter user jsmith set rsa_public_key_2='JERUEHtcve...';
      
  2. Snowflakeに接続するようにコードを更新します。新しい秘密キーを指定します。

    Snowflakeは、接続情報とともに送信された秘密キーに基づいて、認証用の正しいアクティブな公開キーを検証します。

  3. ユーザープロファイルから古い公開キーを削除します。例:

    alter user jsmith unset rsa_public_key;
    

Kafkaの開始

サードパーティのConfluentまたはApache Kafkaのドキュメントに記載されている手順を使用して、Kafkaを開始します。

Kafkaコネクタの開始

Kafkaコネクタは、分散モードまたはスタンドアロンモードで開始できます。それぞれの手順を以下に示します。

分散モード

ターミナルウィンドウから次のコマンドを実行します。

curl -X POST -H "Content-Type: application/json" --data @<config_file>.json http://localhost:8083/connectors

スタンドアロンモード

ターミナルウィンドウから次のコマンドを実行します。

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/connect-standalone.properties

Kafkaコネクタのテストと使用

本番システムでコネクタを使用する前に、少量のデータでKafkaコネクタをテストすることをお勧めします。テストのプロセスは、コネクタを通常使用するプロセスと同じです。

  1. KafkaおよびKafka Connectが実行されていることを確認します。

  2. 適切なKafkaトピックを作成したことを確認します。

  3. メッセージパブリッシャーを作成(または既存のメッセージパブリッシャーを使用)します。トピックにパブリッシュされたメッセージが正しい形式(JSON またはAvro)であることを確認します。

  4. サブスクライブするトピックと、書き込むSnowflakeテーブルを指定する構成ファイルを作成します。次のステップもご参照ください。

  5. (オプション)データを書き込むテーブルを作成します。この手順はオプションです。テーブルを作成しない場合、Kafkaコネクタがテーブルを作成します。コネクタを使用して既存の空でないテーブルにデータを追加する予定がない場合、スキーマの不一致の可能性を最小限に抑えるために、コネクタでテーブルを作成することをお勧めします。

  6. Snowflakeオブジェクト(データベース、スキーマ、ターゲットテーブルなど)に必要な最小限の権限を、データの取り込みに使用されるロールに付与します。

  7. 設定されたKafkaトピックにデータのサンプルセットを公開します。

  8. データがシステム全体に伝播するまで数分待ってから、Snowflakeテーブルをチェックして、記録が挿入されたことを確認します。

ちなみに

テスト環境および運用環境でSnowflakeにデータをロードする前に、 SnowCD を使用してSnowflakeへのネットワーク接続を確認することを検討してください。