Kafka용 Snowflake 커넥터를 사용하여 Protobuf 데이터 로드하기

이 항목에서는 Kafka용 Snowflake 커넥터(《Kafka 커넥터》)에서 프로토콜 버퍼(protobuf) 지원을 설치 및 구성하기 위한 지침을 제공합니다. protobuf를 지원하려면 Kafka 버전 1.5.0 이상이 필요합니다.

Kafka 커넥터가 지원하는 protobuf 변환기 버전은 다음과 같습니다.

Confluent 버전

이 버전은 Kafka의 Confluent 패키지 버전에서만 지원됩니다.

Community 버전

이 버전은 오픈 소스 소프트웨어(OSS) Apache Kafka 패키지에서 지원됩니다. 이 버전은 Kafka의 Confluent 패키지 버전에서도 지원되지만, 편리하게 사용하려면 Confluent 버전을 대신 사용하는 것이 좋습니다.

이러한 protobuf 변환기 중 1개만 설치해야 합니다.

이 항목의 내용:

전제 조건: Kafka용 Snowflake 커넥터 설치하기

Kafka Connector 설치 및 구성하기 의 지침에 따라 Kafka 커넥터를 설치합니다.

Protobuf 변환기의 Confluent 버전 구성하기

참고

Protobuf 변환기의 Confluent 버전은 Confluent 버전 5.5.0 이상에서 사용할 수 있습니다.

  1. Kafka 구성 파일(예: <kafka_dir>/config/connect-distributed.properties)을 텍스트 편집기에서 엽니다.

  2. 파일에서 변환기 속성을 구성합니다. 일반적인 Kafka 커넥터 속성 관련 정보는 Kafka 구성 속성 을 참조하십시오.

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
         "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
         "value.converter.schema.registry.url":"http://localhost:8081"
       }
     }
    
    Copy

    예:

    {
      "name":"XYZCompanySensorData",
      "config":{
        "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":"8",
        "topics":"topic1,topic2",
        "snowflake.topic2table.map": "topic1:table1,topic2:table2",
        "buffer.count.records":"10000",
        "buffer.flush.time":"60",
        "buffer.size.bytes":"5000000",
        "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
        "snowflake.user.name":"jane.smith",
        "snowflake.private.key":"xyz123",
        "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
        "snowflake.database.name":"mydb",
        "snowflake.schema.name":"myschema",
        "key.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "key.converter.schema.registry.url":"CONFLUENT_SCHEMA_REGISTRY",
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://localhost:8081"
      }
    }
    
    Copy
  3. 파일을 저장합니다.

Confluent 콘솔 protobuf 프로듀서, 소스 protobuf 프로듀서 또는 Python 프로듀서를 사용하여 Kafka에서 protobuf 데이터를 생성합니다.

GitHub 의 예시 Python 코드는 Kafka에서 protobuf 데이터를 생성하는 방법을 설명합니다.

Protobuf 변환기의 Community 버전 구성하기

이 섹션에서는 protobuf 변환기 중 Community 버전의 설치 및 구성과 관련한 지침을 제공합니다.

1단계: Community Protobuf 변환기 설치하기

  1. 터미널 창에서 protobuf 변환기용 GitHub 리포지토리의 복제본을 저장할 디렉터리로 변경합니다.

  2. 다음 명령을 실행하여 GitHub 리포지토리 를 복제합니다.

    git clone https://github.com/blueapron/kafka-connect-protobuf-converter
    
    Copy
  3. 다음 명령을 실행하여 Apache Maven 을 사용해 변환기의 3.1.0 버전을 빌드합니다. Kafka 커넥터에는 변환기 버전 2.3.0, 3.0.0 및 3.1.0이 지원됨에 유의하십시오.

    참고

    사용자의 로컬 컴퓨터에 Maven이 이미 설치되어 있어야 합니다.

    cd kafka-connect-protobuf-converter
    
    git checkout tags/v3.1.0
    
    mvn clean package
    
    Copy

    Maven은 현재 폴더에 kafka-connect-protobuf-converter-<버전>-jar-with-dependencies.jar 이라는 이름의 파일을 작성합니다. 이 파일이 변환기 JAR 파일입니다.

  4. 컴파일된 kafka-connect-protobuf-converter-<버전>-jar-with-dependencies.jar 파일을 Kafka 패키지 버전의 디렉터리에 복사합니다.

    Confluent

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka

    <apache_kafka_dir>/libs

2단계: .proto 파일 컴파일하기

메시지를 정의하는 protobuf .proto 파일을 java 파일로 컴파일합니다.

예를 들어, 이름이 sensor.proto 인 파일에 메시지가 정의되어 있다고 가정해 보겠습니다. 터미널 창에서 다음 명령을 실행하여 프로토콜 버퍼 파일을 컴파일합니다. 애플리케이션 소스 코드의 소스 디렉터리, 대상 디렉터리(.java 파일의 경우) 및 .proto 파일의 경로를 지정합니다.

protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/sensor.proto
Copy

샘플 .proto 파일은 https://github.com/snowflakedb/snowflake-kafka-connector/blob/master/test/test_data/sensor.proto에서 제공됩니다.

이 명령을 통해 지정된 대상 디렉터리에 이름이 SensorReadingImpl.java 인 파일이 생성됩니다.

자세한 내용은 Google 개발자 설명서 를 참조하십시오.

3단계: SensorReadingImpl.java 파일 컴파일하기

2단계: .proto 파일 컴파일하기 에서 생성한 SensorReadingImpl.java 파일을 protobuf 프로젝트 구조의 프로젝트 오브젝트 모델과 함께 컴파일합니다.

  1. 2단계: .proto 파일 컴파일하기.pom 파일을 텍스트 편집기에서 엽니다.

  2. 그렇지 않으면 다음과 같은 구조를 갖는 빈 디렉터리를 생성합니다.

    protobuf_folder
    ├── pom.xml
    └── src
        └── main
            └── java
                └── com
                    └── ..
    
    Copy

    여기서 src / main / java 아래의 디렉터리 구조는 .proto 파일(3라인)의 패키지 이름을 나타냅니다.

  3. 2단계: .proto 파일 컴파일하기 에서 생성된 SensorReadingImpl.java 파일을 디렉터리 구조의 맨 아래 폴더에 복사합니다.

  4. 이름이 pom.xml 인 파일을 protobuf_folder 디렉터리의 루트에 생성합니다.

  5. pom.xml 파일을 텍스트 편집기에서 엽니다. 다음 예시 프로젝트 모델을 파일에 복사한 후 수정합니다.

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId><group_id></groupId>
       <artifactId><artifact_id></artifactId>
       <version><version></version>
    
       <properties>
           <java.version><java_version></java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy

    여기서:

    <그룹_id>

    .proto 파일에 지정된 패키지 이름의 ID 세그먼트를 그룹화합니다. 예를 들어, 패키지 이름이 com.foo.bar.buz 이면 그룹 ID는 com.foo 입니다.

    <아티팩트_id>

    선택한 패키지의 아티팩트 ID입니다. 아티팩트 ID는 임의로 선택될 수 있습니다.

    <버전>

    선택한 패키지의 버전입니다. 버전은 임의로 선택될 수 있습니다.

    <java_버전>

    사용자의 로컬 컴퓨터에 설치된 Java Runtime Environment(JRE)의 버전입니다.

    예:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <modelVersion>4.0.0</modelVersion>
    
       <groupId>com.snowflake</groupId>
       <artifactId>kafka-test-protobuf</artifactId>
       <version>1.0.0</version>
    
       <properties>
           <java.version>1.8</java.version>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
       </properties>
    
       <dependencies>
           <dependency>
               <groupId>com.google.protobuf</groupId>
               <artifactId>protobuf-java</artifactId>
               <version>3.11.1</version>
           </dependency>
       </dependencies>
    
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>3.3</version>
                   <configuration>
                       <source>${java.version}</source>
                       <target>${java.version}</target>
                   </configuration>
               </plugin>
               <plugin>
                   <artifactId>maven-assembly-plugin</artifactId>
                   <version>3.1.0</version>
                   <configuration>
                       <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                       </descriptorRefs>
                   </configuration>
                   <executions>
                       <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                               <goal>single</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
    </project>
    
    Copy
  6. 터미널 창에서 protobuf_folder 디렉터리의 루트로 변경합니다. 다음 명령을 실행하여 디렉터리에서 파일의 protobuf 데이터 JAR 파일을 컴파일합니다.

    mvn clean package
    
    Copy

    Maven은 이름이 <아티팩트_id>-<버전>-jar-with-dependencies.jar 인 파일을 protobuf_folder/target 폴더(예: kafka-test-protobuf-1.0.0-jar-with-dependencies.jar)에 생성합니다.

  7. 컴파일된 kafka-test-protobuf-1.0.0-jar-with-dependencies.jar 파일을 Kafka 패키지 버전의 디렉터리에 복사합니다.

    Confluent

    <confluenct_dir>/share/java/kafka-serde-tools

    Apache Kafka

    파일을 $CLASSPATH 환경 변수의 디렉터리에 복사합니다.

4단계: Kafka Connector 구성하기

  1. Kafka 구성 파일(예: <kafka_dir>/config/connect-distributed.properties)을 텍스트 편집기에서 엽니다.

  2. value.converter.protoClassName 속성을 파일에 추가합니다. 이 속성은 메시지(예: com.google.protobuf.Int32Value)를 역직렬화하기 위해 사용할 프로토콜 버퍼 클래스를 지정합니다.

    참고

    중첩 클래스는 $ 표기법(예: com.blueapron.connect.protobuf.NestedTestProtoOuterClass$NestedTestProto)을 사용하여 지정해야 합니다.

    예:

    {
     "name":"XYZCompanySensorData",
       "config":{
         ..
         "value.converter.protoClassName":"com.snowflake.kafka.test.protobuf.SensorReadingImpl$SensorReading"
       }
     }
    
    Copy

    일반적인 Kafka 커넥터 속성 관련 정보는 Kafka 구성 속성 을 참조하십시오.

    프로토콜 버퍼 클래스에 대한 자세한 내용은 이 항목의 앞부분에서 참조한 Google 개발자 설명서를 참조하십시오.

  3. 파일을 저장합니다.