Carregamento de dados protobuf usando o conector Snowflake para Kafka¶
Este tópico fornece instruções para a instalação e configuração do suporte a buffers de protocolo (Protobuf) no conector Snowflake para Kafka (“conector Kafka”). O suporte ao protobuf exige o conector Kafka 1.5.0 (ou superior).
O conector Kafka oferece suporte às seguintes versões do conversor de Protobuf:
- Versão Confluent:
- Essa versão é compatível apenas com a versão do pacote Confluent do Kafka. 
- Versão da comunidade:
- Essa versão é compatível com o pacote do software de código aberto (OSS) Apache Kafka. Essa versão também é compatível com a versão do pacote Confluent do Kafka. No entanto, para facilitar o uso, sugerimos usar a versão Confluent. 
Instale apenas um desses conversores de Protobuf.
Neste tópico:
Pré-requisito: instalação do conector Snowflake para Kafka¶
Instale o conector Kafka usando as instruções em Instalação e configuração do conector Kafka.
Configuração da versão Confluent do conversor de Protobuf¶
Nota
A versão Confluent do conversor de Protobuf está disponível na versão Confluent 5.5.0 (ou superior).
- Abra seu arquivo de configuração do Kafka (por exemplo, - <kafka_dir>/config/connect-distributed.properties) em um editor de texto.
- Configure as propriedades do conversor no arquivo. Para obter mais informações sobre as propriedades dos conectores Kafka em geral, consulte Propriedades de configuração do Kafka. - { "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } } - Por exemplo: - { "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } } 
- Salve o arquivo. 
Crie dados de protobuf do Kafka usando o produtor de Protobuf do console Confluent, o produtor de Protobuf fonte ou o produtor de Python.
Exemplo de código Python localizado em GitHub demonstra como produzir dados de Protobuf a partir do Kafka.
Configuração da versão comunitária do conversor de Protobuf¶
Esta seção fornece instruções para instalar e configurar a versão comunitária do conversor de Protobuf.
Etapa 1: instalação do conversor comunitário de Protobuf¶
- Em uma janela do terminal, mude para o diretório onde você quer armazenar um clone do repositório GitHub para o conversor de Protobuf. 
- Execute o seguinte comando para clonar o repositório GitHub: - git clone https://github.com/blueapron/kafka-connect-protobuf-converter 
- Execute os seguintes comandos para compilar a versão 3.1.0 do conversor usando o Apache Maven. Observe que as versões 2.3.0, 3.0.0 e 3.1.0 do conversor são compatíveis com o conector Kafka: - Nota - O Maven já deve estar instalado em sua máquina local. - cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package - O Maven compila um arquivo chamado - kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jarna pasta atual. Esse é o arquivo JAR do conversor.
- Copie o arquivo - kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jarcompilado para o diretório da versão de seu pacote do Kafka:- Confluent:
- <confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
- <apache_kafka_dir>/libs
 
Etapa 2: compilação do arquivo .proto¶
Compile o arquivo de Protobuf .proto que define suas mensagens em um arquivo java.
Por exemplo, suponha que suas mensagens estejam definidas em um arquivo chamado sensor.proto. Em uma janela do terminal, execute o seguinte comando para compilar o arquivo de buffers de protocolo. Especifique o diretório de origem do código-fonte do aplicativo, o diretório de destino (do arquivo .java) e o caminho do seu arquivo .proto:
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Um exemplo de arquivo .proto está disponível aqui: https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.
O comando gera um arquivo com o nome SensorReadingImpl.java no diretório de destino especificado.
Para obter mais informações, consulte a documentação para desenvolvedores do Google
Etapa 3: compilação do arquivo SensorReadingImpl.java¶
Compile o arquivo SensorReadingImpl.java gerado na Etapa 2: Compilação do arquivo .proto junto com o modelo de objeto de projeto da estrutura do projeto de Protobuf.
- Abra o arquivo - .pomda Etapa 2: Compilação do arquivo .proto em um editor de texto.
- Crie um diretório vazio com uma estrutura: - protobuf_folder ├── pom.xml └── src └── main └── java └── com └── .. - Onde a estrutura do diretório sob - src/- main/- javaespelha o nome do pacote no seu arquivo- .proto(linha 3).
- Copie o arquivo - SensorReadingImpl.javagerado na Etapa 2: Compilação do arquivo .proto para a pasta inferior na estrutura do diretório.
- Crie um arquivo chamado - pom.xmlna raiz do diretório- protobuf_folder.
- Abra o arquivo - pom.xmlvazio em um editor de texto. Copie o seguinte exemplo de modelo de projeto para o arquivo e o modifique:- <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> - Onde: - <id_grupo>
- Segmentos de ID de grupo do nome do pacote especificado no seu arquivo - .proto. Por exemplo, se o nome do pacote for- com.foo.bar.buz, então a ID de grupo é- com.foo.
- <id_artefato>
- ID de artefato do pacote que você escolher. A ID de artefato pode ser escolhida aleatoriamente. 
- <versão>
- Versão do pacote que você escolher. A versão pode ser escolhida aleatoriamente. 
- <versão_java>
- Versão do Java Runtime Environment (JRE) instalado na sua máquina local. 
 - Por exemplo: - <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> 
- Em uma janela do terminal, acesse a raiz do diretório - protobuf_folder. Execute o seguinte comando para compilar o arquivo JAR de dados do Protobuf a partir dos arquivos do diretório:- mvn clean package - O Maven gera um arquivo chamado - <id_artefato>-<versão>-jar-with-dependencies.jarna pasta- protobuf_folder/target(por exemplo,- kafka-test-protobuf-1.0.0-jar-with-dependencies.jar).
- Copie o arquivo - kafka-test-protobuf-1.0.0-jar-with-dependencies.jarcompilado para o diretório da versão do seu pacote do Kafka:- Confluent:
- <confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
- Copie o arquivo para o diretório na sua variável de ambiente - $CLASSPATH.
 
Etapa 4: configuração do conector Kafka¶
- Abra seu arquivo de configuração do Kafka (por exemplo, - <kafka_dir>/config/connect-distributed.properties) em um editor de texto.
- Adicione a propriedade - value.converter.protoClassNameao arquivo. Essa propriedade especifica a classe de buffer de protocolo a ser usada para desfazer a serialização de mensagens (por exemplo,- com.google.protobuf.Int32Value).- Nota - As classes aninhadas devem ser especificadas usando a notação - $(por exemplo,- com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto).- Por exemplo: - { "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } } - Para obter mais informações sobre as propriedades dos conectores Kafka em geral, consulte Propriedades de configuração do Kafka. - Para obter mais informações sobre as classes de buffer de protocolo, consulte a documentação para desenvolvedores do Google referida anteriormente neste tópico. 
- Salve o arquivo.