Kafka용 Snowflake 커넥터를 사용하여 프로토콜 버퍼 데이터 로드하기¶
이 항목에서는 Kafka용 Snowflake 커넥터(“Kafka 커넥터”)에서 프로토콜 버퍼(protobuf) 지원을 설치 및 구성하기 위한 지침을 제공합니다. protobuf를 지원하려면 Kafka 버전 1.5.0 이상이 필요합니다.
Kafka 커넥터가 지원하는 protobuf 변환기 버전은 다음과 같습니다.
- Confluent 버전:
이 버전은 Kafka의 Confluent 패키지 버전에서만 지원됩니다.
- Community 버전:
이 버전은 오픈 소스 소프트웨어(OSS) Apache Kafka 패키지에서 지원됩니다. 이 버전은 Kafka의 Confluent 패키지 버전에서도 지원되지만, 편리하게 사용하려면 Confluent 버전을 대신 사용하는 것이 좋습니다.
이러한 protobuf 변환기 중 1개만 설치해야 합니다.
이 항목의 내용:
전제 조건: Kafka용 Snowflake 커넥터 설치하기¶
Kafka 커넥터 설치 및 구성하기 의 지침에 따라 Kafka 커넥터를 설치합니다.
프로토콜 버퍼 변환기의 Confluent 버전 구성하기¶
참고
Protobuf 변환기의 Confluent 버전은 Confluent 버전 5.5.0 이상에서 사용할 수 있습니다.
Kafka 구성 파일(예:
<kafka_dir>/config/connect-distributed.properties
)을 텍스트 편집기에서 엽니다.파일에서 변환기 속성을 구성합니다. 일반적인 Kafka 커넥터 속성 관련 정보는 Kafka 구성 속성 을 참조하십시오.
{ "name":"XYZCompanySensorData", "config":{ .. "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
예:
{ "name":"XYZCompanySensorData", "config":{ "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"topic1,topic2", "snowflake.topic2table.map": "topic1:table1,topic2:table2", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443", "snowflake.user.name":"jane.smith", "snowflake.private.key":"xyz123", "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r", "snowflake.database.name":"mydb", "snowflake.schema.name":"myschema", "key.converter":"io.confluent.connect.protobuf.ProtobufConverter", "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "value.converter.schema.registry.url":"http://localhost:8081" } }
파일을 저장합니다.
Confluent 콘솔 protobuf 프로듀서, 소스 protobuf 프로듀서 또는 Python 프로듀서를 사용하여 Kafka에서 protobuf 데이터를 생성합니다.
GitHub 의 예시 Python 코드는 Kafka에서 protobuf 데이터를 생성하는 방법을 설명합니다.
프로토콜 버퍼 변환기의 커뮤니티 버전 구성하기¶
이 섹션에서는 protobuf 변환기 중 Community 버전의 설치 및 구성과 관련한 지침을 제공합니다.
1단계: 커뮤니티 프로토콜 버퍼 변환기 설치하기¶
터미널 창에서 protobuf 변환기용 GitHub 리포지토리의 복제본을 저장할 디렉터리로 변경합니다.
다음 명령을 실행하여 GitHub 리포지토리 를 복제합니다.
git clone https://github.com/blueapron/kafka-connect-protobuf-converter
다음 명령을 실행하여 Apache Maven 을 사용해 변환기의 3.1.0 버전을 빌드합니다. Kafka 커넥터에는 변환기 버전 2.3.0, 3.0.0 및 3.1.0이 지원됨에 유의하십시오.
참고
사용자의 로컬 컴퓨터에 Maven이 이미 설치되어 있어야 합니다.
cd kafka-connect-protobuf-converter git checkout tags/v3.1.0 mvn clean package
Maven은 현재 폴더에
kafka-connect-protobuf-converter-<버전>-jar-with-dependencies.jar
이라는 이름의 파일을 작성합니다. 이 파일이 변환기 JAR 파일입니다.컴파일된
kafka-connect-protobuf-converter-<버전>-jar-with-dependencies.jar
파일을 Kafka 패키지 버전의 디렉터리에 복사합니다.- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
<apache_kafka_dir>/libs
2단계: .proto 파일 컴파일하기¶
메시지를 정의하는 protobuf .proto
파일을 java
파일로 컴파일합니다.
예를 들어, 이름이 sensor.proto
인 파일에 메시지가 정의되어 있다고 가정해 보겠습니다. 터미널 창에서 다음 명령을 실행하여 프로토콜 버퍼 파일을 컴파일합니다. 애플리케이션 소스 코드의 소스 디렉터리, 대상 디렉터리(.java
파일의 경우) 및 .proto
파일의 경로를 지정합니다.
protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
샘플 .proto
파일은 https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto에서 제공됩니다.
이 명령을 통해 지정된 대상 디렉터리에 이름이 SensorReadingImpl.java
인 파일이 생성됩니다.
자세한 내용은 Google 개발자 설명서 를 참조하십시오.
3단계: SensorReadingImpl Java 파일 컴파일하기¶
2단계: .proto 파일 컴파일하기 에서 생성한 SensorReadingImpl.java
파일을 protobuf 프로젝트 구조의 프로젝트 오브젝트 모델과 함께 컴파일합니다.
2단계: .proto 파일 컴파일하기 의
.pom
파일을 텍스트 편집기에서 엽니다.그렇지 않으면 다음과 같은 구조를 갖는 빈 디렉터리를 생성합니다.
protobuf_folder ├── pom.xml └── src └── main └── java └── com └── ..
여기서
src
/main
/java
아래의 디렉터리 구조는.proto
파일(3라인)의 패키지 이름을 나타냅니다.2단계: .proto 파일 컴파일하기 에서 생성된
SensorReadingImpl.java
파일을 디렉터리 구조의 맨 아래 폴더에 복사합니다.이름이
pom.xml
인 파일을protobuf_folder
디렉터리의 루트에 생성합니다.빈
pom.xml
파일을 텍스트 편집기에서 엽니다. 다음 예시 프로젝트 모델을 파일에 복사한 후 수정합니다.<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId><group_id></groupId> <artifactId><artifact_id></artifactId> <version><version></version> <properties> <java.version><java_version></java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
여기서:
<그룹_id>
.proto
파일에 지정된 패키지 이름의 ID 세그먼트를 그룹화합니다. 예를 들어, 패키지 이름이com.foo.bar.buz
이면 그룹 ID는com.foo
입니다.<아티팩트_id>
선택한 패키지의 아티팩트 ID입니다. 아티팩트 ID는 임의로 선택될 수 있습니다.
<버전>
선택한 패키지의 버전입니다. 버전은 임의로 선택될 수 있습니다.
<java_버전>
사용자의 로컬 컴퓨터에 설치된 Java Runtime Environment(JRE)의 버전입니다.
예:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.snowflake</groupId> <artifactId>kafka-test-protobuf</artifactId> <version>1.0.0</version> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.11.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.1.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
터미널 창에서
protobuf_folder
디렉터리의 루트로 변경합니다. 다음 명령을 실행하여 디렉터리에서 파일의 protobuf 데이터 JAR 파일을 컴파일합니다.mvn clean package
Maven은 이름이
<아티팩트_id>-<버전>-jar-with-dependencies.jar
인 파일을protobuf_folder/target
폴더(예:kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
)에 생성합니다.컴파일된
kafka-test-protobuf-1.0.0-jar-with-dependencies.jar
파일을 Kafka 패키지 버전의 디렉터리에 복사합니다.- Confluent:
<confluenct_dir>/share/java/kafka-serde-tools
- Apache Kafka:
파일을
$CLASSPATH
환경 변수의 디렉터리에 복사합니다.
4단계: Kafka 커넥터 구성하기¶
Kafka 구성 파일(예:
<kafka_dir>/config/connect-distributed.properties
)을 텍스트 편집기에서 엽니다.value.converter.protoClassName
속성을 파일에 추가합니다. 이 속성은 메시지(예:com.google.protobuf.Int32Value
)를 역직렬화하기 위해 사용할 프로토콜 버퍼 클래스를 지정합니다.참고
중첩 클래스는
$
표기법(예:com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto
)을 사용하여 지정해야 합니다.예:
{ "name":"XYZCompanySensorData", "config":{ .. "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading" } }
일반적인 Kafka 커넥터 속성 관련 정보는 Kafka 구성 속성 을 참조하십시오.
프로토콜 버퍼 클래스에 대한 자세한 내용은 이 항목의 앞부분에서 참조한 Google 개발자 설명서를 참조하십시오.
파일을 저장합니다.