Laden von Protobuf-Daten mit dem Snowflake-Konnektor für Kafka¶
Unter diesem Thema werden die Anweisungen für die Installation und Konfiguration der Unterstützung von Protokollpuffern (protobuf) im Snowflake-Konnektor für Kafka („Kafka-Konnektor“) bereitgestellt. Unterstützung von Protobuf erfordert Kafka-Konnektor 1.5.0 (oder höher).
Der Kafka-Konnektor unterstützt die folgenden Versionen des Protobuf-Konverters:
- Confluent-Version:
Diese Version wird nur von der Confluent-Paketversion von Kafka unterstützt.
- Community-Version:
Diese Version wird vom Open-Source-Softwarepaket (OSS) Apache Kafka unterstützt. Diese Version wird auch von der Confluent-Paketversion von Kafka unterstützt. Aus Gründen der Benutzerfreundlichkeit empfehlen wir jedoch, stattdessen die Confluent-Version zu verwenden.
Installieren Sie nur einen dieser Protobuf-Konverter.
Unter diesem Thema:
Voraussetzungen: Snowflake-Konnektor für Kafka installieren¶
Installieren Sie den Kafka-Konnektor gemäß den Anweisungen unter Installieren und Konfigurieren des Kafka-Konnektors.
Konfigurieren der Confluent-Version des Protobuf-Konverters¶
Bemerkung
Die Confluent-Version des Protobuf-Konverters ist mit der Confluent-Version 5.5.0 verfügbar (oder höher).
Öffnen Sie eine Kafka-Konfigurationsdatei, z. B.
<<Kafka-Verzeichnis>>/config/connect-distributed.properties
, in einem Texteditor.Konfigurieren Sie die Konvertereigenschaften in der Datei. Weitere Informationen zu den Eigenschaften des Kafka-Konnektors im Allgemeinen finden Sie unter Kafka-Konfigurationseigenschaften.
{ "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
Beispiel:
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
Speichern Sie die Datei.
Produzieren Sie Protobuf-Daten von Kafka mit dem Confluent-Konsolen-Protobuf-Produzenten, dem Quell-Protobuf-Produzenten oder dem Python-Produzenten.
Der Python-Beispielcode in GitHub zeigt, wie Protobuf-Daten von Kafka erzeugt werden.
Konfigurieren der Community-Version des Protobuf-Konverters¶
Dieser Abschnitt enthält Anweisungen zur Installation und Konfiguration der Community-Version des protobuf-Konverters.
Schritt 1: Installieren des Community-Protobuf-Konverters¶
Wechseln Sie in einem Terminalfenster in das Verzeichnis, in dem Sie einen Klon des GitHub-Repositorys für den Protobuf-Konverter speichern möchten.
Führen Sie den folgenden Befehl aus, um das GitHub-Repository zu klonen:
git clone https://github.com/blueapron/kafka-connect-protobuf-converter
Führen Sie die folgenden Befehle aus, um die Version 3.1.0 des Konverters mit Apache Maven zu erstellen. Beachten Sie, dass die Versionen 2.3.0, 3.0.0 und 3.1.0 des Konverters durch den Kafka-Konnektor unterstützt werden:
Bemerkung
Maven muss bereits auf Ihrem lokalen Rechner installiert sein.
cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package
Maven erstellt eine Datei mit dem Namen
kafka-connect-protobuf-converter-<Version>-jar-with-dependencies.jar
im aktuellen Ordner. Dies ist die JAR-Konverterdatei.Kopieren Sie die kompilierte Datei
kafka-connect-protobuf-converter-<Version>-jar-with-dependencies.jar
in das Verzeichnis Ihrer Kafka-Paketversion:- Confluent:
<Confluent-Verzeichnis>/share/java/kafka-serde-tools
- Apache Kafka:
<Apache_Kafka-Verzeichnis>/libs
Schritt 2: Kompilieren Ihrer .proto-Datei¶
Kompilieren Sie die protobuf-.proto
-Datei, die Ihre Meldungen definiert, in eine java
-Datei.
Angenommen, Ihre Meldungen sind in einer Datei mit dem Namen sensor.proto
definiert. Führen Sie beispielsweise in einem Terminalfenster den folgenden Befehl aus, um die Protokollpufferdatei zu kompilieren: Geben Sie das Quellverzeichnis für den Quellcode der Anwendung, das Zielverzeichnis (für die .java
-Datei) und den Pfad zu Ihrer .proto
-Datei an:
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Eine Beispieldatei .proto
ist hier verfügbar: https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.
Der Befehl generiert eine Datei mit dem Namen SensorReadingImpl.java
im angegebenen Zielverzeichnis.
Weitere Informationen dazu finden Sie in der Google-Entwickler-Dokumentation
Schritt 3: Kompilieren der Datei SensorReadingImpl.java¶
Kompilieren Sie die generierte SensorReadingImpl.java
-Datei aus Schritt 2: Kompilieren Ihrer .proto-Datei zusammen mit dem Projektobjektmodell der protobuf-Projektstruktur.
Öffnen Sie Ihre
.pom
-Datei aus Schritt 2: Kompilieren Ihrer .proto-Datei in einem Texteditor.Erstellen Sie ein ansonsten leeres Verzeichnis mit einer Struktur:
protobuf_folder ├── pom.xml └── src └── main └── java └── com └── ..
Wo die Verzeichnisstruktur unter
src
/main
/java
den Paketnamen in Ihrer.proto
-Datei spiegelt (Zeile 3).Kopieren Sie die generierte Datei
SensorReadingImpl.java
aus Schritt 2: Kompilieren Ihrer .proto-Datei in den untersten Ordner der Verzeichnisstruktur.Erstellen Sie eine Datei mit dem Namen
pom.xml
im Stammverzeichnis des Verzeichnissesprotobuf_folder
.Öffnen Sie die leere Datei
pom.xml
in einem Texteditor. Kopieren Sie das folgende Beispielprojektmodell in die Datei, und ändern Sie es:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Wobei:
<Gruppen-ID>
Gruppen-ID-Segmente des in Ihrer
.proto
-Datei angegebenen Paketnamens. Wenn z. B. der Paketnamecom.foo.bar.buz
ist, dann ist die Gruppen-IDcom.foo
.<Artefakt-ID>
Artefakt-ID des von Ihnen gewählten Pakets. Die Artefakt-ID kann nach dem Zufallsprinzip ausgewählt werden.
<Version>
Version des von Ihnen gewählten Pakets. Die Version kann nach dem Zufallsprinzip ausgewählt werden.
<Java-Version>
Version der Java-Laufzeitumgebung (JRE), die auf Ihrem lokalen Rechner installiert ist.
Beispiel:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Wechseln Sie in einem Terminalfenster in das Stammverzeichnis von
protobuf_folder
. Führen Sie den folgenden Befehl aus, um die Protobuf-JAR-Datendatei aus den Dateien im Verzeichnis zu kompilieren:mvn clean package
Maven generiert eine Datei namens
<Artefakt-ID>-<Version>-jar-with-dependencies.jar
im Ordnerprotobuf_folder/target
(z. B.kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
).Kopieren Sie die kompilierte
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
-Datei in das Verzeichnis für Ihre Kafka-Paketversion:- Confluent:
<Confluent-Verzeichnis>/share/java/kafka-serde-tools
- Apache Kafka:
Kopieren Sie die Datei in das in der Umgebungsvariable
$CLASSPATH
angegebene Verzeichnis.
Schritt 4: Konfigurieren des Kafka-Konnektors¶
Öffnen Sie eine Kafka-Konfigurationsdatei, z. B.
<<Kafka-Verzeichnis>>/config/connect-distributed.properties
, in einem Texteditor.Fügen Sie der Datei die Eigenschaft
value.converter.protoClassName
hinzu. Diese Eigenschaft gibt die Protokollpufferklasse an, die zur Deserialisierung von Meldungen verwendet werden soll (z. B.com.google.protobuf.Int32Value
).Bemerkung
Verschachtelte Klassen müssen in der
$
-Notation angegeben werden (z. B.com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto
).Beispiel:
{ "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } }
Weitere Informationen zu den Eigenschaften des Kafka-Konnektors im Allgemeinen finden Sie unter Kafka-Konfigurationseigenschaften.
Weitere Informationen zu Protokollpufferklassen finden Sie in der Google-Entwicklerdokumentation, auf die weiter oben unter diesem Thema verwiesen wird.
Speichern Sie die Datei.