Chargement de données Protobuf à l’aide du connecteur Snowflake pour Kafka¶
Ce chapitre fournit des instructions pour l’installation et la configuration de la prise en charge de Protocol Buffers (protobuf) dans le connecteur Snowflake pour Kafka (« connecteur Snowflake »). La prise en charge de protobuf nécessite le connecteur Kafka 1.5.0 (ou supérieur).
Le connecteur Kafka prend en charge les versions suivantes du convertisseur protobuf :
- Version Confluent:
Cette version est prise en charge par la version du paquet Confluent de Kafka uniquement.
- Version communautaire:
Cette version est prise en charge par le paquet logiciel open source (OSS) Apache Kafka. Cette version est également prise en charge par la version du paquet Confluent de Kafka. Cependant, pour une plus grande facilité d’utilisation, nous suggérons d’utiliser plutôt la version Confluent.
N’installez qu’un seul de ces convertisseurs protobuf.
Dans ce chapitre :
Condition préalable : installation du connecteur Snowflake pour Kafka¶
Installez le connecteur Kafka en suivant les instructions dans Installation et configuration du connecteur Kafka.
Configuration de la version Confluent du convertisseur Protobuf¶
Note
La version Confluent du convertisseur Protobuf est disponible avec la version Confluent 5.5.0 (ou supérieur).
Ouvrez votre fichier de configuration Kafka (par exemple
<kafka_dir>/config/connect-distributed.properties
) dans un éditeur de texte.Configurez les propriétés du convertisseur dans le fichier. Pour des informations sur les propriétés du connecteur Kafka en général, voir Propriétés de configuration de Kafka.
{ "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
Par exemple :
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
Enregistrez le fichier.
Produisez des données protobuf à partir de Kafka en utilisant le producteur protobuf de la console Confluent, le producteur protobuf source ou le producteur Python.
L’exemple de code Python situé dans GitHub montre comment produire des données protobuf à partir de Kafka.
Configuration de la version communautaire du convertisseur Protobuf¶
Cette section fournit des instructions pour l’installation et la configuration de la version communautaire du convertisseur protobuf.
Étape 1 : Installation du convertisseur Protobuf communautaire¶
Dans une fenêtre de terminal, accédez au répertoire où vous voulez stocker un clone du référentiel GitHub du convertisseur protobuf.
Exécutez la commande suivante pour cloner le référentiel GitHub :
git clone https://github.com/blueapron/kafka-connect-protobuf-converter
Exécutez les commandes suivantes pour construire la version 3.1.0 du convertisseur en utilisant Apache Maven. Notez que les versions 2.3.0, 3.0.0 et 3.1.0 du convertisseur sont prises en charge par le connecteur Kafka :
Note
Maven doit déjà être installé sur votre machine locale.
cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package
Maven construit un fichier nommé
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar
dans le dossier courant. Ceci est le fichier JAR du convertisseur.Copiez le fichier
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar
compilé dans le répertoire de la version de votre paquet Kafka :- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
<apache_kafka_dir>/libs
Étape 2 : Compilation de votre fichier .proto¶
Compilez le fichier protobuf .proto
qui définit vos messages dans un fichier java
.
Par exemple, supposons que vos messages soient définis dans un fichier nommé sensor.proto
. À partir d’une fenêtre de terminal, exécutez la commande suivante pour compiler le fichier protocol buffers : Précisez le répertoire source du code source de l’application, le répertoire de destination (pour le fichier .java
) et le chemin d’accès à votre fichier .proto
:
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Un exemple de fichier .proto
est disponible ici : https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.
La commande génère un fichier nommé SensorReadingImpl.java
dans le répertoire de destination spécifié.
Pour plus d’informations, voir la documentation Google pour les développeurs
Étape 3 : Compilation du fichier SensorReadingImpl.java¶
Compilez le fichier SensorReadingImpl.java
généré à l’étape 2 : Compilation de votre fichier .proto avec le Project Object Model de la structure de projet protobuf.
Ouvrez votre fichier
.pom
issu de l’étape 2 : Compilation de votre fichier .proto dans un éditeur de texte.Créez un autre répertoire vide avec une structure :
protobuf_folder ├── pom.xml └── src └── main └── java └── com └── ..
Où la structure du répertoire sous
src
/main
/java
reflète le nom du paquet dans votre fichier.proto
(ligne 3).Copiez le fichier
SensorReadingImpl.java
généré à l’étape 2 : Compilation de votre fichier .proto dans le dossier inférieur de la structure des répertoires.Créez un fichier nommé
pom.xml
à la racine du répertoireprotobuf_folder
.Ouvrez le fichier
pom.xml
vide dans un éditeur de texte. Copiez l’exemple de modèle de projet suivant dans le fichier et modifiez-le :<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Où :
<id_groupe>
Segments d’ID de groupe du nom du paquet spécifié dans votre fichier
.proto
. Par exemple, si le nom du paquet estcom.foo.bar.buz
, alors l’ID de groupe estcom.foo
.<id_artefact>
ID d’artefact du paquet que vous choisissez. L’ID d’artefact peut être choisi au hasard.
<version>
Version du paquet que vous choisissez. La version peut être choisie au hasard.
<version_java>
Version de l’environnement d’exécution Java (JRE) installée sur votre machine locale.
Par exemple :
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Dans une fenêtre de terminal, accédez à la racine du répertoire
protobuf_folder
. Exécutez la commande suivante pour compiler le fichier JAR de données protobuf à partir des fichiers du répertoire :mvn clean package
Maven génère un fichier nommé
<artifact_id>-<version>-jar-with-dependencies.jar
dans le dossierprotobuf_folder/target
(par exemplekafka-test-protobuf-1.0.0-jar-with-dependencies.jar
).Copiez le fichier
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
compilé dans le répertoire de la version de votre paquet Kafka :- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
Copiez le fichier dans le répertoire de votre variable d’environnement
$CLASSPATH
.
Étape 4 : Configuration du connecteur Kafka¶
Ouvrez votre fichier de configuration Kafka (par exemple
<kafka_dir>/config/connect-distributed.properties
) dans un éditeur de texte.Ajoutez la propriété
value.converter.protoClassName
au fichier. Cette propriété spécifie la classe de protocol buffer à utiliser pour désérialiser les messages (par exemplecom.google.protobuf.Int32Value
).Note
Les classes imbriquées doivent être spécifiées en utilisant la notation
$
(par exemplecom.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto
).Par exemple :
{ "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } }
Pour des informations sur les propriétés du connecteur Kafka en général, voir Propriétés de configuration de Kafka.
Pour plus d’informations sur les classes protocol buffer, voir la documentation Google pour les développeurs mentionnée plus haut dans ce chapitre.
Enregistrez le fichier.