Kafka用Snowflakeコネクタを使用したprotobufデータのロード¶
このトピックでは、Kafka用Snowflakeコネクタ(「Kafkaコネクタ」)でプロトコルバッファ(protobuf)サポートをインストールおよび構成する手順について説明します。Protobufのサポートには、Kafkaコネクタ1.5.0(またはそれ以上)が必要です。
Kafkaコネクタは、次のバージョンのprotobufコンバーターをサポートしています。
- Confluentバージョン:
このバージョンをサポートしているのは、KafkaのConfluentパッケージバージョンのみです。
- コミュニティバージョン:
このバージョンは、オープンソースソフトウェア(OSS)ApacheKafkaパッケージでサポートされています。このバージョンは、KafkaのConfluentパッケージバージョンでもサポートされています。ただし、使いやすさのために、代わりにConfluentバージョンを使用することをお勧めします。
これらのprotobufコンバーターの1つだけをインストールしてください。
前提条件:Python用Snowflakeコネクタのインストール¶
Kafkaコネクタのインストールと構成 の手順を使用して、Kafkaコネクタをインストールします。
ProtobufコンバーターのConfluentバージョンの設定¶
注釈
ProtobufコンバーターのConfluentバージョンは、Confluentバージョン5.5.0(またはそれ以上)で使用できます。
Kafka構成ファイル(例:
<kafkaディレクトリ>/config/connect-distributed.properties)をテキストエディターで開きます。ファイルでコンバーターのプロパティを構成します。一般的なKafkaコネクタのプロパティについては、 Kafka構成プロパティ をご参照ください。
例:
ファイルを保存します。
Confluentコンソールprotobufプロデューサー、ソースprotobufプロデューサー、またはPythonプロデューサーを使用して、Kafkaからprotobufデータを生成します。
GitHub にあるPythonコードの例は、Kafkaからprotobufデータを生成する方法を示しています。
Protobufコンバーターのコミュニティバージョンの設定¶
このセクションでは、protobufコンバーターのコミュニティバージョンをインストールおよび構成する手順について説明します。
ステップ1: Community Protobufコンバーターをインストールする¶
ターミナルウィンドウで、protobufコンバーターの GitHub リポジトリのクローンを保存するディレクトリに移動します。
次のコマンドを実行して、 GitHubリポジトリ のクローンを作成します。
次のコマンドを実行して、 Apache Maven を使用してコンバーターの3.1.0バージョンをビルドします。コンバーターのバージョン2.3.0、3.0.0、および3.1.0が、Kafkaコネクタでサポートされています。
注釈
Mavenは、ローカルマシンにあらかじめインストールされている必要があります。
Mavenは、現在のフォルダーに
kafka-connect-protobuf-converter-<バージョン>-jar-with-dependencies.jarという名前のファイルを作成します。これはコンバーターの JAR ファイルです。コンパイルされた
kafka-connect-protobuf-converter-<バージョン>-jar-with-dependencies.jarファイルをKafkaパッケージバージョンのディレクトリにコピーします。- Confluent:
<Confluenctディレクトリ>/share/java/kafka-serde-tools- Apache Kafka:
<Apache Kafkaディレクトリ>/libs
ステップ2: .protoファイルをコンパイルする¶
メッセージを定義するprotobuf .proto ファイルを java ファイルにコンパイルします。
たとえば、メッセージが sensor.proto という名前のファイルで定義されているとします。ターミナルウィンドウから次のコマンドを実行して、プロトコルバッファファイルをコンパイルします。アプリケーションのソースコードのソースディレクトリ、宛先ディレクトリ( .java ファイルの場合)、および .proto ファイルへのパスを指定します。
サンプルの .proto ファイルは、https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.protoから入手できます。
このコマンドは、指定された宛先ディレクトリに SensorReadingImpl.java という名前のファイルを生成します。
詳細については、 Google開発者向けドキュメント をご参照ください。
ステップ3: SensorReadingImpl.javaファイルをコンパイルする¶
ステップ2:.protoファイルをコンパイルする から生成された SensorReadingImpl.java ファイルをprotobufプロジェクト構造のプロジェクトオブジェクトモデルとともにコンパイルします。
ステップ2:.protoファイルをコンパイルする からの
.pomファイルをテキストエディターで開きます。それ以外の場合は空のディレクトリを次の構造で作成します。
src/main/javaの下のディレクトリ構造は、.protoファイルにあるパッケージ名を反映しています(行3)。ステップ2:.protoファイルをコンパイルする から生成された
SensorReadingImpl.javaファイルをディレクトリ構造の一番下のフォルダーにコピーします。protobuf_folderディレクトリのルートにpom.xmlという名前のファイルを作成します。テキストエディターで、空の
pom.xmlファイルを開きます。次のサンプルプロジェクトモデルをファイルにコピーして変更します。条件:
<グループID>.protoファイルで指定されたパッケージ名のグループ ID セグメント。たとえば、パッケージ名がcom.foo.bar.buzの場合、グループ ID はcom.fooです。<アーティファクトID>選択したパッケージのアーティファクト ID。アーティファクト ID はランダムに選択できます。
<バージョン>選択したパッケージのバージョン。バージョンはランダムに選択できます。
<Javaバージョン>ローカルマシンにインストールされているJava Runtime Environment(JRE)のバージョン。
例:
ターミナルウィンドウで、
protobuf_folderディレクトリのルートに移動します。次のコマンドを実行して、ディレクトリ内のファイルからprotobufデータ JAR ファイルをコンパイルします。Mavenは、
protobuf_folder/targetフォルダー(例:kafka-test-protobuf-1.0.0-jar-with-dependencies.jar)に<アーティファクトID>-<バージョン>-jar-with-dependencies.jarという名前のファイルを生成します。コンパイルされた
kafka-test-protobuf-1.0.0-jar-with-dependencies.jarファイルをKafkaパッケージバージョンのディレクトリにコピーします。- Confluent:
<Confluenctディレクトリ>/share/java/kafka-serde-tools- Apache Kafka:
ファイルを
$CLASSPATH環境変数のディレクトリにコピーします。
ステップ4: Kafkaコネクタを構成する¶
Kafka構成ファイル(例:
<kafkaディレクトリ>/config/connect-distributed.properties)をテキストエディターで開きます。value.converter.protoClassNameプロパティをファイルに追加します。このプロパティは、メッセージの逆シリアル化に使用するプロトコルバッファクラスを指定します(例:com.google.protobuf.Int32Value)。注釈
ネストされたクラスは、
$表記(例:com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto)を使用して指定する必要があります。例:
一般的なKafkaコネクタのプロパティについては、 Kafka構成プロパティ をご参照ください。
プロトコルバッファクラスの詳細については、このトピックで前述したGoogle開発者向けドキュメントをご参照ください。
ファイルを保存します。