Kafka用Snowflakeコネクタを使用したprotobufデータのロード¶
このトピックでは、Kafka用Snowflakeコネクタ(「Kafkaコネクタ」)でプロトコルバッファ(protobuf)サポートをインストールおよび構成する手順について説明します。Protobufのサポートには、Kafkaコネクタ1.5.0(またはそれ以上)が必要です。
Kafkaコネクタは、次のバージョンのprotobufコンバーターをサポートしています。
- Confluentバージョン:
このバージョンをサポートしているのは、KafkaのConfluentパッケージバージョンのみです。
- コミュニティバージョン:
このバージョンは、オープンソースソフトウェア(OSS)ApacheKafkaパッケージでサポートされています。このバージョンは、KafkaのConfluentパッケージバージョンでもサポートされています。ただし、使いやすさのために、代わりにConfluentバージョンを使用することをお勧めします。
これらのprotobufコンバーターの1つだけをインストールしてください。
このトピックの内容:
前提条件:Python用Snowflakeコネクタのインストール¶
Kafkaコネクタのインストールと構成 の手順を使用して、Kafkaコネクタをインストールします。
ProtobufコンバーターのConfluentバージョンの設定¶
注釈
ProtobufコンバーターのConfluentバージョンは、Confluentバージョン5.5.0(またはそれ以上)で使用できます。
Kafka構成ファイル(例:
<kafkaディレクトリ>/config/connect-distributed.properties
)をテキストエディターで開きます。ファイルでコンバーターのプロパティを構成します。一般的なKafkaコネクタのプロパティについては、 Kafka構成プロパティ をご参照ください。
{ "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
例:
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
ファイルを保存します。
Confluentコンソールprotobufプロデューサー、ソースprotobufプロデューサー、またはPythonプロデューサーを使用して、Kafkaからprotobufデータを生成します。
GitHub にあるPythonコードの例は、Kafkaからprotobufデータを生成する方法を示しています。
Protobufコンバーターのコミュニティバージョンの設定¶
このセクションでは、protobufコンバーターのコミュニティバージョンをインストールおよび構成する手順について説明します。
ステップ1: Community Protobufコンバーターをインストールする¶
ターミナルウィンドウで、protobufコンバーターの GitHub リポジトリのクローンを保存するディレクトリに移動します。
次のコマンドを実行して、 GitHubリポジトリ のクローンを作成します。
git clone https://github.com/blueapron/kafka-connect-protobuf-converter
次のコマンドを実行して、 Apache Maven を使用してコンバーターの3.1.0バージョンをビルドします。コンバーターのバージョン2.3.0、3.0.0、および3.1.0が、Kafkaコネクタでサポートされています。
注釈
Mavenは、ローカルマシンにあらかじめインストールされている必要があります。
cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package
Mavenは、現在のフォルダーに
kafka-connect-protobuf-converter-<バージョン>-jar-with-dependencies.jar
という名前のファイルを作成します。これはコンバーターの JAR ファイルです。コンパイルされた
kafka-connect-protobuf-converter-<バージョン>-jar-with-dependencies.jar
ファイルをKafkaパッケージバージョンのディレクトリにコピーします。- Confluent:
<Confluenctディレクトリ>/share/java/kafka-serde-tools
- Apache Kafka:
<Apache Kafkaディレクトリ>/libs
ステップ2: .protoファイルをコンパイルする¶
メッセージを定義するprotobuf .proto
ファイルを java
ファイルにコンパイルします。
たとえば、メッセージが sensor.proto
という名前のファイルで定義されているとします。ターミナルウィンドウから次のコマンドを実行して、プロトコルバッファファイルをコンパイルします。アプリケーションのソースコードのソースディレクトリ、宛先ディレクトリ( .java
ファイルの場合)、および .proto
ファイルへのパスを指定します。
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
サンプルの .proto
ファイルは、https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.protoから入手できます。
このコマンドは、指定された宛先ディレクトリに SensorReadingImpl.java
という名前のファイルを生成します。
詳細については、 Google開発者向けドキュメント をご参照ください。
ステップ3: SensorReadingImpl.javaファイルをコンパイルする¶
ステップ2:.protoファイルをコンパイルする から生成された SensorReadingImpl.java
ファイルをprotobufプロジェクト構造のプロジェクトオブジェクトモデルとともにコンパイルします。
ステップ2:.protoファイルをコンパイルする からの
.pom
ファイルをテキストエディターで開きます。それ以外の場合は空のディレクトリを次の構造で作成します。
protobuf_folder ├── pom.xml └── src └── main └── java └── com └── ..
src
/main
/java
の下のディレクトリ構造は、.proto
ファイルにあるパッケージ名を反映しています(行3)。ステップ2:.protoファイルをコンパイルする から生成された
SensorReadingImpl.java
ファイルをディレクトリ構造の一番下のフォルダーにコピーします。protobuf_folder
ディレクトリのルートにpom.xml
という名前のファイルを作成します。テキストエディターで、空の
pom.xml
ファイルを開きます。次のサンプルプロジェクトモデルをファイルにコピーして変更します。<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
条件:
<グループID>
.proto
ファイルで指定されたパッケージ名のグループ ID セグメント。たとえば、パッケージ名がcom.foo.bar.buz
の場合、グループ ID はcom.foo
です。<アーティファクトID>
選択したパッケージのアーティファクト ID。アーティファクト ID はランダムに選択できます。
<バージョン>
選択したパッケージのバージョン。バージョンはランダムに選択できます。
<Javaバージョン>
ローカルマシンにインストールされているJava Runtime Environment(JRE)のバージョン。
例:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
ターミナルウィンドウで、
protobuf_folder
ディレクトリのルートに移動します。次のコマンドを実行して、ディレクトリ内のファイルからprotobufデータ JAR ファイルをコンパイルします。mvn clean package
Mavenは、
protobuf_folder/target
フォルダー(例:kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
)に<アーティファクトID>-<バージョン>-jar-with-dependencies.jar
という名前のファイルを生成します。コンパイルされた
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
ファイルをKafkaパッケージバージョンのディレクトリにコピーします。- Confluent:
<Confluenctディレクトリ>/share/java/kafka-serde-tools
- Apache Kafka:
ファイルを
$CLASSPATH
環境変数のディレクトリにコピーします。
ステップ4: Kafkaコネクタを構成する¶
Kafka構成ファイル(例:
<kafkaディレクトリ>/config/connect-distributed.properties
)をテキストエディターで開きます。value.converter.protoClassName
プロパティをファイルに追加します。このプロパティは、メッセージの逆シリアル化に使用するプロトコルバッファクラスを指定します(例:com.google.protobuf.Int32Value
)。注釈
ネストされたクラスは、
$
表記(例:com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto
)を使用して指定する必要があります。例:
{ "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } }
一般的なKafkaコネクタのプロパティについては、 Kafka構成プロパティ をご参照ください。
プロトコルバッファクラスの詳細については、このトピックで前述したGoogle開発者向けドキュメントをご参照ください。
ファイルを保存します。