Laden von Protobuf-Daten mit dem Snowflake-Konnektor für Kafka

Unter diesem Thema werden die Anweisungen für die Installation und Konfiguration der Unterstützung von Protokollpuffern (protobuf) im Snowflake-Konnektor für Kafka („Kafka-Konnektor“) bereitgestellt. Unterstützung von Protobuf erfordert Kafka-Konnektor 1.5.0 (oder höher).

Der Kafka-Konnektor unterstützt die folgenden Versionen des Protobuf-Konverters:

Confluent-Version

Diese Version wird nur von der Confluent-Paketversion von Kafka unterstützt.

Community-Version

Diese Version wird vom Open-Source-Softwarepaket (OSS) Apache Kafka unterstützt. Diese Version wird auch von der Confluent-Paketversion von Kafka unterstützt. Aus Gründen der Benutzerfreundlichkeit empfehlen wir jedoch, stattdessen die Confluent-Version zu verwenden.

Installieren Sie nur einen dieser Protobuf-Konverter.

Unter diesem Thema:

Voraussetzungen: Snowflake-Konnektor für Kafka installieren

Installieren Sie den Kafka-Konnektor gemäß den Anweisungen unter Installieren und Konfigurieren des Kafka-Konnektors.

Konfigurieren der Confluent-Version des Protobuf-Konverters

Bemerkung

Die Confluent-Version des Protobuf-Konverters ist mit der Confluent-Version 5.5.0 verfügbar (oder höher).

  1. Öffnen Sie eine Kafka-Konfigurationsdatei, z. B. <<Kafka-Verzeichnis>>/config/connect-distributed.properties, in einem Texteditor.

  2. Konfigurieren Sie die Konvertereigenschaften in der Datei. Weitere Informationen zu den Eigenschaften des Kafka-Konnektors im Allgemeinen finden Sie unter Kafka-Konfigurationseigenschaften.

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
         "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "value.converter.schema.registry.url":"http://localhost:8081"
       }
     }
    

    Beispiel:

    {
      "name":"XYZCompanySensorData",
      "config":{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":"8",
        "topics":"topic1,topic2",
        "snowflake.topic2table.map": "topic1:table1,topic2:table2",
        "buffer.count.records":"10000",
        "buffer.flush.time":"60",
        "buffer.size.bytes":"5000000",
        "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
        "snowflake.user.name":"jane.smith",
        "snowflake.private.key":"xyz123",
        "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
        "snowflake.database.name":"mydb",
        "snowflake.schema.name":"myschema",
        "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"
      }
    }
    
  3. Speichern Sie die Datei.

Produzieren Sie Protobuf-Daten von Kafka mit dem Confluent-Konsolen-Protobuf-Produzenten, dem Quell-Protobuf-Produzenten oder dem Python-Produzenten.

Der Python-Beispielcode in GitHub zeigt, wie Protobuf-Daten von Kafka erzeugt werden.

Konfigurieren der Community-Version des Protobuf-Konverters

Dieser Abschnitt enthält Anweisungen zur Installation und Konfiguration der Community-Version des protobuf-Konverters.

Schritt 1: Installieren des Community-Protobuf-Konverters

  1. Wechseln Sie in einem Terminalfenster in das Verzeichnis, in dem Sie einen Klon des GitHub-Repositorys für den Protobuf-Konverter speichern möchten.

  2. Führen Sie den folgenden Befehl aus, um das GitHub-Repository zu klonen:

    git clone https://github.com/blueapron/kafka-connect-protobuf-converter
    
  3. Führen Sie die folgenden Befehle aus, um die Version 3.1.0 des Konverters mit Apache Maven zu erstellen. Beachten Sie, dass die Versionen 2.3.0, 3.0.0 und 3.1.0 des Konverters durch den Kafka-Konnektor unterstützt werden:

    Bemerkung

    Maven muss bereits auf Ihrem lokalen Rechner installiert sein.

    cd kafka-connect-protobuf-converter
    
    git checkout tags/v3.1.0
    
    mvn clean package
    

    Maven erstellt eine Datei mit dem Namen kafka-connect-protobuf-converter-<Version>-jar-with-dependencies.jar im aktuellen Ordner. Dies ist die JAR-Konverterdatei.

  4. Kopieren Sie die kompilierte Datei kafka-connect-protobuf-converter-<Version>-jar-with-dependencies.jar in das Verzeichnis Ihrer Kafka-Paketversion:

    Confluent

    <Confluent-Verzeichnis>/share/java/kafka-serde-tools

    Apache Kafka

    <Apache_Kafka-Verzeichnis>/libs

Schritt 2: Kompilieren Ihrer .proto-Datei

Kompilieren Sie die protobuf-.proto-Datei, die Ihre Meldungen definiert, in eine java-Datei.

Angenommen, Ihre Meldungen sind in einer Datei mit dem Namen sensor.proto definiert. Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus, um die Protokollpufferdatei zu kompilieren: Geben Sie das Quellverzeichnis für den Quellcode der Anwendung, das Zielverzeichnis (für die .java-Datei) und den Pfad zu Ihrer .proto-Datei an:

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto

Eine Beispieldatei .proto ist hier verfügbar: https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.

Der Befehl generiert eine Datei mit dem Namen SensorReadingImpl.java im angegebenen Zielverzeichnis.

Weitere Informationen dazu finden Sie in der Google-Entwickler-Dokumentation

Schritt 3: Kompilieren der Datei SensorReadingImpl.java

Kompilieren Sie die generierte SensorReadingImpl.java-Datei aus Schritt 2: Kompilieren Ihrer .proto-Datei zusammen mit dem Projektobjektmodell der protobuf-Projektstruktur.

  1. Öffnen Sie Ihre .pom-Datei aus Schritt 2: Kompilieren Ihrer .proto-Datei in einem Texteditor.

  2. Erstellen Sie ein ansonsten leeres Verzeichnis mit einer Struktur:

    protobuf_folder
    ├── pom.xml
    └── src
        └── main
            └── java
                └── com
                    └── ..
    

    Wo die Verzeichnisstruktur unter src / main / java den Paketnamen in Ihrer .proto-Datei spiegelt (Zeile 3).

  3. Kopieren Sie die generierte Datei SensorReadingImpl.java aus Schritt 2: Kompilieren Ihrer .proto-Datei in den untersten Ordner der Verzeichnisstruktur.

  4. Erstellen Sie eine Datei mit dem Namen pom.xml im Stammverzeichnis des Verzeichnisses protobuf_folder.

  5. Öffnen Sie die leere Datei pom.xml in einem Texteditor. Kopieren Sie das folgende Beispielprojektmodell in die Datei, und ändern Sie es:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId><group_id></groupId>
       <artifactId><artifact_id></artifactId>
       <version><version></version>
    
       <properties>
           <java.version><java_version></java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    

    Wobei:

    <Gruppen-ID>

    Gruppen-ID-Segmente des in Ihrer .proto-Datei angegebenen Paketnamens. Wenn z. B. der Paketname com.foo.bar.buz ist, dann ist die Gruppen-ID com.foo.

    <Artefakt-ID>

    Artefakt-ID des von Ihnen gewählten Pakets. Die Artefakt-ID kann nach dem Zufallsprinzip ausgewählt werden.

    <Version>

    Version des von Ihnen gewählten Pakets. Die Version kann nach dem Zufallsprinzip ausgewählt werden.

    <Java-Version>

    Version der Java-Laufzeitumgebung (JRE), die auf Ihrem lokalen Rechner installiert ist.

    Beispiel:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.snowflake</groupId>
       <artifactId>kafka-test-protobuf</artifactId>
       <version>1.0.0</version>
    
       <properties>
           <java.version>1.8</java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
  6. Wechseln Sie in einem Terminalfenster in das Stammverzeichnis von protobuf_folder. Führen Sie den folgenden Befehl aus, um die Protobuf-JAR-Datendatei aus den Dateien im Verzeichnis zu kompilieren:

    mvn clean package
    

    Maven generiert eine Datei namens <Artefakt-ID>-<Version>-jar-with-dependencies.jar im Ordner protobuf_folder/target (z. B. kafka-test-protobuf-1.0.0-jar-with-dependencies.jar).

  7. Kopieren Sie die kompilierte kafka-test-protobuf-1.0.0-jar-with-dependencies.jar-Datei in das Verzeichnis für Ihre Kafka-Paketversion:

    Confluent

    <Confluent-Verzeichnis>/share/java/kafka-serde-tools

    Apache Kafka

    Kopieren Sie die Datei in das in der Umgebungsvariable $CLASSPATH angegebene Verzeichnis.

Schritt 4: Konfigurieren des Kafka-Konnektors

  1. Öffnen Sie eine Kafka-Konfigurationsdatei, z. B. <<Kafka-Verzeichnis>>/config/connect-distributed.properties, in einem Texteditor.

  2. Fügen Sie der Datei die Eigenschaft value.converter.protoClassName hinzu. Diese Eigenschaft gibt die Protokollpufferklasse an, die zur Deserialisierung von Meldungen verwendet werden soll (z. B. com.google.protobuf.Int32Value).

    Bemerkung

    Verschachtelte Klassen müssen in der $-Notation angegeben werden (z. B. com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto).

    Beispiel:

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading"
       }
     }
    

    Weitere Informationen zu den Eigenschaften des Kafka-Konnektors im Allgemeinen finden Sie unter Kafka-Konfigurationseigenschaften.

    Weitere Informationen zu Protokollpufferklassen finden Sie in der Google-Entwicklerdokumentation, auf die weiter oben in diesem Thema verwiesen wird.

  3. Speichern Sie die Datei.