Chargement de données Protobuf à l’aide du connecteur Snowflake pour Kafka

Ce chapitre fournit des instructions pour l’installation et la configuration de la prise en charge de Protocol Buffers (protobuf) dans le connecteur Snowflake pour Kafka (« connecteur Snowflake »). La prise en charge de protobuf nécessite le connecteur Kafka 1.5.0 (ou supérieur).

Le connecteur Kafka prend en charge les versions suivantes du convertisseur protobuf :

Version Confluent

Cette version est prise en charge par la version du paquet Confluent de Kafka uniquement.

Version communautaire

Cette version est prise en charge par le paquet logiciel open source (OSS) Apache Kafka. Cette version est également prise en charge par la version du paquet Confluent de Kafka. Cependant, pour une plus grande facilité d’utilisation, nous suggérons d’utiliser plutôt la version Confluent.

N’installez qu’un seul de ces convertisseurs protobuf.

Dans ce chapitre :

Condition préalable : installation du connecteur Snowflake pour Kafka

Installez le connecteur Kafka en suivant les instructions dans Installation et configuration du connecteur Kafka.

Configuration de la version Confluent du convertisseur Protobuf

Note

La version Confluent du convertisseur Protobuf est disponible avec la version Confluent 5.5.0 (ou supérieur).

  1. Ouvrez votre fichier de configuration Kafka (par exemple <kafka_dir>/config/connect-distributed.properties) dans un éditeur de texte.

  2. Configurez les propriétés du convertisseur dans le fichier. Pour des informations sur les propriétés du connecteur Kafka en général, voir Propriétés de configuration de Kafka.

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
         "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "value.converter.schema.registry.url":"http://localhost:8081"
       }
     }
    
    Copy

    Par exemple :

    {
      "name":"XYZCompanySensorData",
      "config":{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":"8",
        "topics":"topic1,topic2",
        "snowflake.topic2table.map": "topic1:table1,topic2:table2",
        "buffer.count.records":"10000",
        "buffer.flush.time":"60",
        "buffer.size.bytes":"5000000",
        "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
        "snowflake.user.name":"jane.smith",
        "snowflake.private.key":"xyz123",
        "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
        "snowflake.database.name":"mydb",
        "snowflake.schema.name":"myschema",
        "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"
      }
    }
    
    Copy
  3. Enregistrez le fichier.

Produisez des données protobuf à partir de Kafka en utilisant le producteur protobuf de la console Confluent, le producteur protobuf source ou le producteur Python.

L’exemple de code Python situé dans GitHub montre comment produire des données protobuf à partir de Kafka.

Configuration de la version communautaire du convertisseur Protobuf

Cette section fournit des instructions pour l’installation et la configuration de la version communautaire du convertisseur protobuf.

Étape 1 : Installation du convertisseur communautaire Protobuf

  1. Dans une fenêtre de terminal, accédez au répertoire où vous voulez stocker un clone du référentiel GitHub du convertisseur protobuf.

  2. Exécutez la commande suivante pour cloner le référentiel GitHub :

    git clone https://github.com/blueapron/kafka-connect-protobuf-converter
    
    Copy
  3. Exécutez les commandes suivantes pour construire la version 3.1.0 du convertisseur en utilisant Apache Maven. Notez que les versions 2.3.0, 3.0.0 et 3.1.0 du convertisseur sont prises en charge par le connecteur Kafka :

    Note

    Maven doit déjà être installé sur votre machine locale.

    cd kafka-connect-protobuf-converter
    
    git checkout tags/v3.1.0
    
    mvn clean package
    
    Copy

    Maven construit un fichier nommé kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar dans le dossier courant. Ceci est le fichier JAR du convertisseur.

  4. Copiez le fichier kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar compilé dans le répertoire de la version de votre paquet Kafka :

    Confluent

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka

    <apache_kafka_dir>/libs

Étape 2 : Compilation de votre fichier .proto

Compilez le fichier protobuf .proto qui définit vos messages dans un fichier java.

Par exemple, supposons que vos messages soient définis dans un fichier nommé sensor.proto. À partir d’une fenêtre de terminal, exécutez la commande suivante pour compiler le fichier protocol buffers : Précisez le répertoire source du code source de l’application, le répertoire de destination (pour le fichier .java ) et le chemin d’accès à votre fichier .proto :

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Copy

Un exemple de fichier .proto est disponible ici : https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.

La commande génère un fichier nommé SensorReadingImpl.java dans le répertoire de destination spécifié.

Pour plus d’informations, voir la documentation Google pour les développeurs

Étape 3 : Compilation du fichier SensorReadingImpl.java

Compilez le fichier SensorReadingImpl.java généré à l’étape 2 : Compilation de votre fichier .proto avec le Project Object Model de la structure de projet protobuf.

  1. Ouvrez votre fichier .pom issu de l’étape 2 : Compilation de votre fichier .proto dans un éditeur de texte.

  2. Créez un autre répertoire vide avec une structure :

    protobuf_folder
    ├── pom.xml
    └── src
        └── main
            └── java
                └── com
                    └── ..
    
    Copy

    Où la structure du répertoire sous src / main / java reflète le nom du paquet dans votre fichier .proto (ligne 3).

  3. Copiez le fichier SensorReadingImpl.java généré à l’étape 2 : Compilation de votre fichier .proto dans le dossier inférieur de la structure des répertoires.

  4. Créez un fichier nommé pom.xml à la racine du répertoire protobuf_folder.

  5. Ouvrez le fichier pom.xml vide dans un éditeur de texte. Copiez l’exemple de modèle de projet suivant dans le fichier et modifiez-le :

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId><group_id></groupId>
       <artifactId><artifact_id></artifactId>
       <version><version></version>
    
       <properties>
           <java.version><java_version></java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy

    Où :

    <id_groupe>

    Segments d’ID de groupe du nom du paquet spécifié dans votre fichier .proto . Par exemple, si le nom du paquet est com.foo.bar.buz, alors l’ID de groupe est com.foo.

    <id_artefact>

    ID d’artefact du paquet que vous choisissez. L’ID d’artefact peut être choisi au hasard.

    <version>

    Version du paquet que vous choisissez. La version peut être choisie au hasard.

    <version_java>

    Version de l’environnement d’exécution Java (JRE) installée sur votre machine locale.

    Par exemple :

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.snowflake</groupId>
       <artifactId>kafka-test-protobuf</artifactId>
       <version>1.0.0</version>
    
       <properties>
           <java.version>1.8</java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy
  6. Dans une fenêtre de terminal, accédez à la racine du répertoire protobuf_folder. Exécutez la commande suivante pour compiler le fichier JAR de données protobuf à partir des fichiers du répertoire :

    mvn clean package
    
    Copy

    Maven génère un fichier nommé <artifact_id>-<version>-jar-with-dependencies.jar dans le dossier protobuf_folder/target (par exemple kafka-test-protobuf-1.0.0-jar-with-dependencies.jar).

  7. Copiez le fichier kafka-test-protobuf-1.0.0-jar-with-dependencies.jar compilé dans le répertoire de la version de votre paquet Kafka :

    Confluent

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka

    Copiez le fichier dans le répertoire de votre variable d’environnement $CLASSPATH.

Étape 4 : Configuration du connecteur Kafka

  1. Ouvrez votre fichier de configuration Kafka (par exemple <kafka_dir>/config/connect-distributed.properties) dans un éditeur de texte.

  2. Ajoutez la propriété value.converter.protoClassName au fichier. Cette propriété spécifie la classe de protocol buffer à utiliser pour désérialiser les messages (par exemple com.google.protobuf.Int32Value).

    Note

    Les classes imbriquées doivent être spécifiées en utilisant la notation $ (par exemple com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto).

    Par exemple :

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading"
       }
     }
    
    Copy

    Pour des informations sur les propriétés du connecteur Kafka en général, voir Propriétés de configuration de Kafka.

    Pour plus d’informations sur les classes protocol buffer, voir la documentation Google pour les développeurs mentionnée plus haut dans ce chapitre.

  3. Enregistrez le fichier.