Carregamento de dados protobuf usando o conector Snowflake para Kafka

Este tópico fornece instruções para a instalação e configuração do suporte a buffers de protocolo (Protobuf) no conector Snowflake para Kafka (“conector Kafka”). O suporte ao protobuf exige o conector Kafka 1.5.0 (ou superior).

O conector Kafka oferece suporte às seguintes versões do conversor de Protobuf:

Versão Confluent:

Essa versão é compatível apenas com a versão do pacote Confluent do Kafka.

Versão da comunidade:

Essa versão é compatível com o pacote do software de código aberto (OSS) Apache Kafka. Essa versão também é compatível com a versão do pacote Confluent do Kafka. No entanto, para facilitar o uso, sugerimos usar a versão Confluent.

Instale apenas um desses conversores de Protobuf.

Neste tópico:

Pré-requisito: instalação do conector Snowflake para Kafka

Instale o conector Kafka usando as instruções em Instalação e configuração do conector Kafka.

Configuração da versão Confluent do conversor de Protobuf

Nota

A versão Confluent do conversor de Protobuf está disponível na versão Confluent 5.5.0 (ou superior).

  1. Abra seu arquivo de configuração do Kafka (por exemplo, <kafka_dir>/config/connect-distributed.properties) em um editor de texto.

  2. Configure as propriedades do conversor no arquivo. Para obter mais informações sobre as propriedades dos conectores Kafka em geral, consulte Propriedades de configuração do Kafka.

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
         "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "value.converter.schema.registry.url":"http://localhost:8081"
       }
     }
    
    Copy

    Por exemplo:

    {
      "name":"XYZCompanySensorData",
      "config":{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":"8",
        "topics":"topic1,topic2",
        "snowflake.topic2table.map": "topic1:table1,topic2:table2",
        "buffer.count.records":"10000",
        "buffer.flush.time":"60",
        "buffer.size.bytes":"5000000",
        "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
        "snowflake.user.name":"jane.smith",
        "snowflake.private.key":"xyz123",
        "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
        "snowflake.database.name":"mydb",
        "snowflake.schema.name":"myschema",
        "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"
      }
    }
    
    Copy
  3. Salve o arquivo.

Crie dados de protobuf do Kafka usando o produtor de Protobuf do console Confluent, o produtor de Protobuf fonte ou o produtor de Python.

Exemplo de código Python localizado em GitHub demonstra como produzir dados de Protobuf a partir do Kafka.

Configuração da versão comunitária do conversor de Protobuf

Esta seção fornece instruções para instalar e configurar a versão comunitária do conversor de Protobuf.

Etapa 1: instalação do conversor comunitário de Protobuf

  1. Em uma janela do terminal, mude para o diretório onde você quer armazenar um clone do repositório GitHub para o conversor de Protobuf.

  2. Execute o seguinte comando para clonar o repositório GitHub:

    git clone https://github.com/blueapron/kafka-connect-protobuf-converter
    
    Copy
  3. Execute os seguintes comandos para compilar a versão 3.1.0 do conversor usando o Apache Maven. Observe que as versões 2.3.0, 3.0.0 e 3.1.0 do conversor são compatíveis com o conector Kafka:

    Nota

    O Maven já deve estar instalado em sua máquina local.

    cd kafka-connect-protobuf-converter
    
    git checkout tags/v3.1.0
    
    mvn clean package
    
    Copy

    O Maven compila um arquivo chamado kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar na pasta atual. Esse é o arquivo JAR do conversor.

  4. Copie o arquivo kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar compilado para o diretório da versão de seu pacote do Kafka:

    Confluent:

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka:

    <apache_kafka_dir>/libs

Etapa 2: compilação do arquivo .proto

Compile o arquivo de Protobuf .proto que define suas mensagens em um arquivo java.

Por exemplo, suponha que suas mensagens estejam definidas em um arquivo chamado sensor.proto. Em uma janela do terminal, execute o seguinte comando para compilar o arquivo de buffers de protocolo. Especifique o diretório de origem do código-fonte do aplicativo, o diretório de destino (do arquivo .java) e o caminho do seu arquivo .proto:

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Copy

Um exemplo de arquivo .proto está disponível aqui: https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.

O comando gera um arquivo com o nome SensorReadingImpl.java no diretório de destino especificado.

Para obter mais informações, consulte a documentação para desenvolvedores do Google

Etapa 3: compilação do arquivo SensorReadingImpl.java

Compile o arquivo SensorReadingImpl.java gerado na Etapa 2: Compilação do arquivo .proto junto com o modelo de objeto de projeto da estrutura do projeto de Protobuf.

  1. Abra o arquivo .pom da Etapa 2: Compilação do arquivo .proto em um editor de texto.

  2. Crie um diretório vazio com uma estrutura:

    protobuf_folder
    ├── pom.xml
    └── src
        └── main
            └── java
                └── com
                    └── ..
    
    Copy

    Onde a estrutura do diretório sob src / main / java espelha o nome do pacote no seu arquivo .proto (linha 3).

  3. Copie o arquivo SensorReadingImpl.java gerado na Etapa 2: Compilação do arquivo .proto para a pasta inferior na estrutura do diretório.

  4. Crie um arquivo chamado pom.xml na raiz do diretório protobuf_folder.

  5. Abra o arquivo pom.xml vazio em um editor de texto. Copie o seguinte exemplo de modelo de projeto para o arquivo e o modifique:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId><group_id></groupId>
       <artifactId><artifact_id></artifactId>
       <version><version></version>
    
       <properties>
           <java.version><java_version></java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy

    Onde:

    <id_grupo>

    Segmentos de ID de grupo do nome do pacote especificado no seu arquivo .proto. Por exemplo, se o nome do pacote for com.foo.bar.buz, então a ID de grupo é com.foo.

    <id_artefato>

    ID de artefato do pacote que você escolher. A ID de artefato pode ser escolhida aleatoriamente.

    <versão>

    Versão do pacote que você escolher. A versão pode ser escolhida aleatoriamente.

    <versão_java>

    Versão do Java Runtime Environment (JRE) instalado na sua máquina local.

    Por exemplo:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.snowflake</groupId>
       <artifactId>kafka-test-protobuf</artifactId>
       <version>1.0.0</version>
    
       <properties>
           <java.version>1.8</java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy
  6. Em uma janela do terminal, acesse a raiz do diretório protobuf_folder. Execute o seguinte comando para compilar o arquivo JAR de dados do Protobuf a partir dos arquivos do diretório:

    mvn clean package
    
    Copy

    O Maven gera um arquivo chamado <id_artefato>-<versão>-jar-with-dependencies.jar na pasta protobuf_folder/target (por exemplo, kafka-test-protobuf-1.0.0-jar-with-dependencies.jar).

  7. Copie o arquivo kafka-test-protobuf-1.0.0-jar-with-dependencies.jar compilado para o diretório da versão do seu pacote do Kafka:

    Confluent:

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka:

    Copie o arquivo para o diretório na sua variável de ambiente $CLASSPATH.

Etapa 4: configuração do conector Kafka

  1. Abra seu arquivo de configuração do Kafka (por exemplo, <kafka_dir>/config/connect-distributed.properties) em um editor de texto.

  2. Adicione a propriedade value.converter.protoClassName ao arquivo. Essa propriedade especifica a classe de buffer de protocolo a ser usada para desfazer a serialização de mensagens (por exemplo, com.google.protobuf.Int32Value).

    Nota

    As classes aninhadas devem ser especificadas usando a notação $ (por exemplo, com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto).

    Por exemplo:

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading"
       }
     }
    
    Copy

    Para obter mais informações sobre as propriedades dos conectores Kafka em geral, consulte Propriedades de configuração do Kafka.

    Para obter mais informações sobre as classes de buffer de protocolo, consulte a documentação para desenvolvedores do Google referida anteriormente neste tópico.

  3. Salve o arquivo.