Carregamento de dados protobuf usando o conector Snowflake para Kafka¶
Este tópico fornece instruções para a instalação e configuração do suporte a buffers de protocolo (Protobuf) no conector Snowflake para Kafka (“conector Kafka”). O suporte ao protobuf exige o conector Kafka 1.5.0 (ou superior).
O conector Kafka oferece suporte às seguintes versões do conversor de Protobuf:
- Versão Confluent:
Essa versão é compatível apenas com a versão do pacote Confluent do Kafka.
- Versão da comunidade:
Essa versão é compatível com o pacote do software de código aberto (OSS) Apache Kafka. Essa versão também é compatível com a versão do pacote Confluent do Kafka. No entanto, para facilitar o uso, sugerimos usar a versão Confluent.
Instale apenas um desses conversores de Protobuf.
Neste tópico:
Pré-requisito: instalação do conector Snowflake para Kafka¶
Instale o conector Kafka usando as instruções em Instalação e configuração do conector Kafka.
Configuração da versão Confluent do conversor de Protobuf¶
Nota
A versão Confluent do conversor de Protobuf está disponível na versão Confluent 5.5.0 (ou superior).
Abra seu arquivo de configuração do Kafka (por exemplo,
<kafka_dir>/config/connect-distributed.properties
) em um editor de texto.Configure as propriedades do conversor no arquivo. Para obter mais informações sobre as propriedades dos conectores Kafka em geral, consulte Propriedades de configuração do Kafka.
{ "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
Por exemplo:
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
Salve o arquivo.
Crie dados de protobuf do Kafka usando o produtor de Protobuf do console Confluent, o produtor de Protobuf fonte ou o produtor de Python.
Exemplo de código Python localizado em GitHub demonstra como produzir dados de Protobuf a partir do Kafka.
Configuração da versão comunitária do conversor de Protobuf¶
Esta seção fornece instruções para instalar e configurar a versão comunitária do conversor de Protobuf.
Etapa 1: instalação do conversor comunitário de Protobuf¶
Em uma janela do terminal, mude para o diretório onde você quer armazenar um clone do repositório GitHub para o conversor de Protobuf.
Execute o seguinte comando para clonar o repositório GitHub:
git clone https://github.com/blueapron/kafka-connect-protobuf-converter
Execute os seguintes comandos para compilar a versão 3.1.0 do conversor usando o Apache Maven. Observe que as versões 2.3.0, 3.0.0 e 3.1.0 do conversor são compatíveis com o conector Kafka:
Nota
O Maven já deve estar instalado em sua máquina local.
cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package
O Maven compila um arquivo chamado
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar
na pasta atual. Esse é o arquivo JAR do conversor.Copie o arquivo
kafka-connect-protobuf-converter-<version>-jar-with-dependencies.jar
compilado para o diretório da versão de seu pacote do Kafka:- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
<apache_kafka_dir>/libs
Etapa 2: compilação do arquivo .proto¶
Compile o arquivo de Protobuf .proto
que define suas mensagens em um arquivo java
.
Por exemplo, suponha que suas mensagens estejam definidas em um arquivo chamado sensor.proto
. Em uma janela do terminal, execute o seguinte comando para compilar o arquivo de buffers de protocolo. Especifique o diretório de origem do código-fonte do aplicativo, o diretório de destino (do arquivo .java
) e o caminho do seu arquivo .proto
:
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Um exemplo de arquivo .proto
está disponível aqui: https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto.
O comando gera um arquivo com o nome SensorReadingImpl.java
no diretório de destino especificado.
Para obter mais informações, consulte a documentação para desenvolvedores do Google
Etapa 3: compilação do arquivo SensorReadingImpl.java¶
Compile o arquivo SensorReadingImpl.java
gerado na Etapa 2: Compilação do arquivo .proto junto com o modelo de objeto de projeto da estrutura do projeto de Protobuf.
Abra o arquivo
.pom
da Etapa 2: Compilação do arquivo .proto em um editor de texto.Crie um diretório vazio com uma estrutura:
protobuf_folder ├── pom.xml └── src └── main └── java └── com └── ..
Onde a estrutura do diretório sob
src
/main
/java
espelha o nome do pacote no seu arquivo.proto
(linha 3).Copie o arquivo
SensorReadingImpl.java
gerado na Etapa 2: Compilação do arquivo .proto para a pasta inferior na estrutura do diretório.Crie um arquivo chamado
pom.xml
na raiz do diretórioprotobuf_folder
.Abra o arquivo
pom.xml
vazio em um editor de texto. Copie o seguinte exemplo de modelo de projeto para o arquivo e o modifique:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Onde:
<id_grupo>
Segmentos de ID de grupo do nome do pacote especificado no seu arquivo
.proto
. Por exemplo, se o nome do pacote forcom.foo.bar.buz
, então a ID de grupo écom.foo
.<id_artefato>
ID de artefato do pacote que você escolher. A ID de artefato pode ser escolhida aleatoriamente.
<versão>
Versão do pacote que você escolher. A versão pode ser escolhida aleatoriamente.
<versão_java>
Versão do Java Runtime Environment (JRE) instalado na sua máquina local.
Por exemplo:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Em uma janela do terminal, acesse a raiz do diretório
protobuf_folder
. Execute o seguinte comando para compilar o arquivo JAR de dados do Protobuf a partir dos arquivos do diretório:mvn clean package
O Maven gera um arquivo chamado
<id_artefato>-<versão>-jar-with-dependencies.jar
na pastaprotobuf_folder/target
(por exemplo,kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
).Copie o arquivo
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
compilado para o diretório da versão do seu pacote do Kafka:- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
Copie o arquivo para o diretório na sua variável de ambiente
$CLASSPATH
.
Etapa 4: configuração do conector Kafka¶
Abra seu arquivo de configuração do Kafka (por exemplo,
<kafka_dir>/config/connect-distributed.properties
) em um editor de texto.Adicione a propriedade
value.converter.protoClassName
ao arquivo. Essa propriedade especifica a classe de buffer de protocolo a ser usada para desfazer a serialização de mensagens (por exemplo,com.google.protobuf.Int32Value
).Nota
As classes aninhadas devem ser especificadas usando a notação
$
(por exemplo,com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto
).Por exemplo:
{ "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } }
Para obter mais informações sobre as propriedades dos conectores Kafka em geral, consulte Propriedades de configuração do Kafka.
Para obter mais informações sobre as classes de buffer de protocolo, consulte a documentação para desenvolvedores do Google referida anteriormente neste tópico.
Salve o arquivo.