Kafka用Snowflakeコネクタを使用したprotobufデータのロード

このトピックでは、Kafka用Snowflakeコネクタ(「Kafkaコネクタ」)でプロトコルバッファ(protobuf)サポートをインストールおよび構成する手順について説明します。Protobufのサポートには、Kafkaコネクタ1.5.0(またはそれ以上)が必要です。

Kafkaコネクタは、次のバージョンのprotobufコンバーターをサポートしています。

Confluentバージョン:

このバージョンをサポートしているのは、KafkaのConfluentパッケージバージョンのみです。

コミュニティバージョン:

このバージョンは、オープンソースソフトウェア(OSS)ApacheKafkaパッケージでサポートされています。このバージョンは、KafkaのConfluentパッケージバージョンでもサポートされています。ただし、使いやすさのために、代わりにConfluentバージョンを使用することをお勧めします。

これらのprotobufコンバーターの1つだけをインストールしてください。

このトピックの内容:

前提条件:Python用Snowflakeコネクタのインストール

Kafkaコネクタのインストールと構成 の手順を使用して、Kafkaコネクタをインストールします。

ProtobufコンバーターのConfluentバージョンの設定

注釈

ProtobufコンバーターのConfluentバージョンは、Confluentバージョン5.5.0(またはそれ以上)で使用できます。

  1. Kafka構成ファイル(例: <kafkaディレクトリ>/config/connect-distributed.properties)をテキストエディターで開きます。

  2. ファイルでコンバーターのプロパティを構成します。一般的なKafkaコネクタのプロパティについては、 Kafka構成プロパティ をご参照ください。

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
         "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "value.converter.schema.registry.url":"http://localhost:8081"
       }
     }
    
    Copy

    例:

    {
      "name":"XYZCompanySensorData",
      "config":{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":"8",
        "topics":"topic1,topic2",
        "snowflake.topic2table.map": "topic1:table1,topic2:table2",
        "buffer.count.records":"10000",
        "buffer.flush.time":"60",
        "buffer.size.bytes":"5000000",
        "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
        "snowflake.user.name":"jane.smith",
        "snowflake.private.key":"xyz123",
        "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
        "snowflake.database.name":"mydb",
        "snowflake.schema.name":"myschema",
        "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"
      }
    }
    
    Copy
  3. ファイルを保存します。

Confluentコンソールprotobufプロデューサー、ソースprotobufプロデューサー、またはPythonプロデューサーを使用して、Kafkaからprotobufデータを生成します。

GitHub にあるPythonコードの例は、Kafkaからprotobufデータを生成する方法を示しています。

Protobufコンバーターのコミュニティバージョンの設定

このセクションでは、protobufコンバーターのコミュニティバージョンをインストールおよび構成する手順について説明します。

ステップ1: Community Protobufコンバーターをインストールする

  1. ターミナルウィンドウで、protobufコンバーターの GitHub リポジトリのクローンを保存するディレクトリに移動します。

  2. 次のコマンドを実行して、 GitHubリポジトリ のクローンを作成します。

    git clone https://github.com/blueapron/kafka-connect-protobuf-converter
    
    Copy
  3. 次のコマンドを実行して、 Apache Maven を使用してコンバーターの3.1.0バージョンをビルドします。コンバーターのバージョン2.3.0、3.0.0、および3.1.0が、Kafkaコネクタでサポートされています。

    注釈

    Mavenは、ローカルマシンにあらかじめインストールされている必要があります。

    cd kafka-connect-protobuf-converter
    
    git checkout tags/v3.1.0
    
    mvn clean package
    
    Copy

    Mavenは、現在のフォルダーに kafka-connect-protobuf-converter-<バージョン>-jar-with-dependencies.jar という名前のファイルを作成します。これはコンバーターの JAR ファイルです。

  4. コンパイルされた kafka-connect-protobuf-converter-<バージョン>-jar-with-dependencies.jar ファイルをKafkaパッケージバージョンのディレクトリにコピーします。

    Confluent:

    <Confluenctディレクトリ>/share/java/kafka-serde-tools

    Apache Kafka:

    <Apache Kafkaディレクトリ>/libs

ステップ2: .protoファイルをコンパイルする

メッセージを定義するprotobuf .proto ファイルを java ファイルにコンパイルします。

たとえば、メッセージが sensor.proto という名前のファイルで定義されているとします。ターミナルウィンドウから次のコマンドを実行して、プロトコルバッファファイルをコンパイルします。アプリケーションのソースコードのソースディレクトリ、宛先ディレクトリ( .java ファイルの場合)、および .proto ファイルへのパスを指定します。

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Copy

サンプルの .proto ファイルは、https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.protoから入手できます。

このコマンドは、指定された宛先ディレクトリに SensorReadingImpl.java という名前のファイルを生成します。

詳細については、 Google開発者向けドキュメント をご参照ください。

ステップ3: SensorReadingImpl.javaファイルをコンパイルする

ステップ2:.protoファイルをコンパイルする から生成された SensorReadingImpl.java ファイルをprotobufプロジェクト構造のプロジェクトオブジェクトモデルとともにコンパイルします。

  1. ステップ2:.protoファイルをコンパイルする からの .pom ファイルをテキストエディターで開きます。

  2. それ以外の場合は空のディレクトリを次の構造で作成します。

    protobuf_folder
    ├── pom.xml
    └── src
        └── main
            └── java
                └── com
                    └── ..
    
    Copy

    src / main / java の下のディレクトリ構造は、 .proto ファイルにあるパッケージ名を反映しています(行3)。

  3. ステップ2:.protoファイルをコンパイルする から生成された SensorReadingImpl.java ファイルをディレクトリ構造の一番下のフォルダーにコピーします。

  4. protobuf_folder ディレクトリのルートに pom.xml という名前のファイルを作成します。

  5. テキストエディターで、空の pom.xml ファイルを開きます。次のサンプルプロジェクトモデルをファイルにコピーして変更します。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId><group_id></groupId>
       <artifactId><artifact_id></artifactId>
       <version><version></version>
    
       <properties>
           <java.version><java_version></java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy

    条件:

    <グループID>

    .proto ファイルで指定されたパッケージ名のグループ ID セグメント。たとえば、パッケージ名が com.foo.bar.buz の場合、グループ ID は com.foo です。

    <アーティファクトID>

    選択したパッケージのアーティファクト ID。アーティファクト ID はランダムに選択できます。

    <バージョン>

    選択したパッケージのバージョン。バージョンはランダムに選択できます。

    <Javaバージョン>

    ローカルマシンにインストールされているJava Runtime Environment(JRE)のバージョン。

    例:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.snowflake</groupId>
       <artifactId>kafka-test-protobuf</artifactId>
       <version>1.0.0</version>
    
       <properties>
           <java.version>1.8</java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy
  6. ターミナルウィンドウで、 protobuf_folder ディレクトリのルートに移動します。次のコマンドを実行して、ディレクトリ内のファイルからprotobufデータ JAR ファイルをコンパイルします。

    mvn clean package
    
    Copy

    Mavenは、 protobuf_folder/target フォルダー(例: kafka-test-protobuf-1.0.0-jar-with-dependencies.jar)に <アーティファクトID>-<バージョン>-jar-with-dependencies.jar という名前のファイルを生成します。

  7. コンパイルされた kafka-test-protobuf-1.0.0-jar-with-dependencies.jar ファイルをKafkaパッケージバージョンのディレクトリにコピーします。

    Confluent:

    <Confluenctディレクトリ>/share/java/kafka-serde-tools

    Apache Kafka:

    ファイルを $CLASSPATH 環境変数のディレクトリにコピーします。

ステップ4: Kafkaコネクタを構成する

  1. Kafka構成ファイル(例: <kafkaディレクトリ>/config/connect-distributed.properties)をテキストエディターで開きます。

  2. value.converter.protoClassName プロパティをファイルに追加します。このプロパティは、メッセージの逆シリアル化に使用するプロトコルバッファクラスを指定します(例: com.google.protobuf.Int32Value)。

    注釈

    ネストされたクラスは、 $ 表記(例: com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto)を使用して指定する必要があります。

    例:

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading"
       }
     }
    
    Copy

    一般的なKafkaコネクタのプロパティについては、 Kafka構成プロパティ をご参照ください。

    プロトコルバッファクラスの詳細については、このトピックで前述したGoogle開発者向けドキュメントをご参照ください。

  3. ファイルを保存します。