Snowflake High Performance connector for Kafka: Install and configure

This topic describes the steps to install and configure the Snowflake High Performance connector for Kafka.

Kafka 커넥터 설치하기

Kafka 커넥터는 JAR(Java 실행 가능) 파일로 제공됩니다.

Snowflake는 두 가지 커넥터 버전을 제공합니다.

이 항목의 지침에서는 커넥터의 두 버전 중 1개에만 적용되는 단계를 지정합니다.

설치 전제 조건

  • The Kafka connector supports the following package versions:

    패키지

    Snowflake Kafka Connector Version

    패키지 지원(Snowflake에서 테스트됨)

    Apache Kafka

    2.0.0 이상

    Apache Kafka 2.8.2, 3.7.2

    Confluent

    2.0.0 이상

    Confluent 6.2.15, 7.8.2

  • Kafka 커넥터는 Kafka Connect API 3.9.0과 함께 사용하도록 개발되었습니다. 최신 버전의 Kafka Connect API는 테스트되지 않았습니다. 3.9.0 이전 버전은 커넥터와 호환됩니다. 자세한 내용은 Kafka 호환성 을 참조하십시오.

  • 사용자 환경에 Kafka 커넥터와 JDBC 드라이버 jar 파일이 모두 있는 경우 JDBC 버전이 원하는 Kafka 커넥터 버전의 snowflake-jdbc 파일에 지정된 pom.xml 버전과 일치하는지 확인하세요. 원하는 Kafka 커넥터 릴리스 버전(예: `v4.0.0-rc4<https://github.com/snowflakedb/snowflake-kafka-connector/releases/tag/v4.0.0-rc4>`_)으로 이동할 수 있습니다. 그런 다음 pom.xml 파일을 탐색하여 snowflake-jdbc 버전을 확인하십시오.

  • 데이터 수집에 Avro 형식을 사용하는 경우 다음과 같습니다.

    • Avro 파서 버전 1.8.2 이상을 사용해야 하며, 파서는 .https://mvnrepository.com/artifact/org.apache.avro에서 제공됩니다.

    • Avro와 함께 스키마 레지스트리 기능을 사용하는 경우 Kafka Connect Avro Converter 버전 5.0.0 이상을 사용해야 하며, https://mvnrepository.com/artifact/io.confluent에서 제공됩니다.

      OSS Apache Kafka 패키지에서는 스키마 레지스트리 기능을 사용할 수 없습니다.

  • 원하는 데이터 보존 시간 및/또는 저장소 제한을 적용하여 Kafka를 구성합니다.

  • Install and configure the Kafka Connect cluster.

    각 Kafka Connect 클러스터 노드에는 Kafka 커넥터에서 사용할 충분한 RAM이 포함되어야 합니다. 최소 권장 용량은 Kafka 파티션당 5 MB입니다. 이러한 용량은 Kafka Connect가 수행하는 다른 작업에 필요한 RAM에 추가됩니다.

  • Kafka Broker 및 Kafka Connect Runtime에서 동일한 버전을 사용하는 것이 좋습니다.

  • Snowflake는 Snowflake 계정과 동일한 클라우드 공급자 리전 에서 Kafka Connect 인스턴스를 실행하는 것이 적극 권장합니다. 이는 필수가 아니지만, 일반적으로 처리량을 향상할 수 있습니다.

Snowflake 클라이언트에서 지원되는 운영 체제 목록은 운영 체제 지원 을 참조하십시오.

Installing the connector

이 섹션에서는 Confluent용 Kafka 커넥터 설치 및 구성에 대한 지침을 제공합니다. 다음 테이블에서는 지원되는 버전 및 사전 릴리스와 릴리스 후보에 대한 정보를 설명합니다.

릴리스 시리즈

상태

참고

4.x.x

공개 미리 보기

조기 액세스. 지원 Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview 현재 3.x 및 2.x 버전에서 마이그레이션하려면 수동으로 수행해야 합니다. 이전 버전의 대체 드롭으로 사용할 수 없습니다. 버전 3.x, 2.x, 1.x와는 다른 기능 세트가 있습니다.

3.x.x

공식적으로 지원됨

지원되지 않음 Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview.

2.x.x

공식적으로 지원됨

업그레이드를 권장합니다. 지원되지 않음 Snowpipe Streaming High Performance Architecture https://docs.snowflake.com/en/user-guide/snowpipe-streaming/snowpipe-streaming-high-performance-overview.

1.x.x

지원되지 않음

이 릴리스 시리즈는 사용하지 마십시오.

Installing the connector for Confluent

Download the Kafka connector files

다음 위치 중 하나에서 Kafka 커넥터 JAR 파일을 다운로드합니다.

Confluent Hub:

https://www.confluent.io/hub/

패키지에는 암호화되거나 암호화되지 않은 개인 키를 키 페어 인증에서 사용하기 위해 필요한 모든 종속성이 포함됩니다. 자세한 내용은 이 항목 뒷부분의 키 페어 인증 및 키 순환 사용하기 섹션을 참조하세요.

Maven Central Repository:

https://mvnrepository.com/artifact/com.snowflake

이 버전을 사용할 때는 `Bouncy Castle<https://www.bouncycastle.org/>`_ 암호화 라이브러리(JAR 파일)를 다운로드해야 합니다.

이러한 파일을 Kafka 커넥터 JAR 파일과 동일한 로컬 폴더에 다운로드합니다.

커넥터용 소스 코드는 https://github.com/snowflakedb/snowflake-kafka-connector에서 제공됩니다.

Install the Kafka connector

다른 커넥터의 설치를 위해 제공되는 지침을 사용하여 Kafka 커넥터를 설치합니다.

오픈 소스 Apache Kafka용 커넥터 설치하기

이 섹션에서는 오픈 소스 Apache Kafka용 Kafka 커넥터를 설치 및 구성하기 위한 지침을 제공합니다.

Install Apache Kafka

  1. `Kafka 공식 웹사이트<https://kafka.apache.org/downloads>`_에서 Kafka 패키지를 다운로드합니다.

  2. 터미널 창에서 패키지 파일을 다운로드한 디렉터리를 변경합니다.

  3. 다음 명령을 실행하여 kafka_<scala_버전>-<kafka_버전>.tgz 파일의 압축을 풉니다.

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

JDK 설치

Java Development Kit(JDK) 버전 11 이상을 설치하고 구성합니다. Snowflake는 SE의 Standard Edition(JDK)에 대한 테스트를 완료했습니다. Enterprise Edition(EE)은 호환될 것으로 예상되지만, 테스트가 수행되지 않았습니다.

이전에 JDK를 설치한 경우에는 이 섹션을 건너뛸 수 있습니다.

  1. `Oracle JDK 웹사이트<https://www.oracle.com/technetwork/java/javase/downloads/index.html>`_에서 JDK를 다운로드합니다.

  2. JDK를 설치하거나 압축을 풉니다.

  3. 사용 중인 운영 체제의 지침에 따라 JDK가 포함된 디렉터리를 가리키도록 환경 변수 JAVA_HOME을 설정합니다.

Download the Kafka connector JAR files

  1. Maven Central Repository에서 Kafka 커넥터 JAR 파일 다운로드:

    https://mvnrepository.com/artifact/com.snowflake

  2. `Bouncy Castle<https://www.bouncycastle.org/>`_ 암호화 라이브러리 jar 파일을 다운로드합니다.

  3. Kafka 데이터가 `Apache Avro<https://avro.apache.org/>`_ 형식으로 스트리밍되는 경우에는 Avro JAR 파일(1.11.4)을 다운로드합니다.

커넥터용 소스 코드는 https://github.com/snowflakedb/snowflake-kafka-connector에서 제공됩니다.

Install the Kafka connector

label-install_kafkahp_oss`에서 다운로드한 JAR 파일을 :code:`<kafka_dir>/libs 폴더에 복사합니다.

Configuring the Kafka connector

독립 실행형 모드로 배포된 경우 커넥터는 Snowflake 로그인 자격 증명, 항목 이름, Snowflake 테이블 이름 등의 매개 변수를 지정하는 파일을 생성하여 구성됩니다. 분산 모드로 배포된 경우 커넥터는 kafka Connect 클러스터의 REST API 엔드포인트를 호출하여 구성됩니다.

중요

The Kafka Connect framework broadcasts the configuration settings for the Kafka connector from the master node to worker nodes. Configuration settings include sensitive information, specifically, the Snowflake username and private key. Make sure to secure the communication channel between Kafka Connect nodes. For more information, see the documentation for your Apache Kafka software.

Each configuration specifies the topics and corresponding tables for one database and one schema in that database. Note that a connector can ingest messages from any number of topics, but the corresponding tables must all be stored in a single database and schema.

이 섹션에서는 분산형 및 독립 실행형 모드에 대한 지침을 제공합니다.

구성 필드에 대한 설명은 커넥터 구성 속성 을 참조하십시오.

중요

일반적으로 구성 파일에는 개인 키와 같은 보안 관련 정보가 포함되어 있으므로 파일에 대한 읽기/쓰기 권한을 적절하게 설정하여 액세스를 제한해야 합니다.

In addition, consider storing the configuration file in a secure external location or a key management service. For more information, see Externalizing Secrets (in this topic).

분산형 모드

Kafka 구성 파일(예: <path>/<config_file>.json)을 생성합니다. 모든 커넥터 구성 정보로 파일을 채웁니다. 파일은 JSON 형식이어야 합니다.

Sample configuration file

{
  "name":"XYZCompanySensorData",
  "config":{
      "connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
      "tasks.max": "1",
      "snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
      "snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
      "snowflake.warehouse.name": "WH",
      "snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
      "snowflake.schema.name": "MY_SCHEMA",
      "snowflake.database.name": "MY_DATABASE",
      "snowflake.role.name": "MY_ROLE",
      "snowflake.user.name": "MY_USER",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "errors.log.enable": "true",
      "topics": "topic1,topic2",
      "value.converter.schemas.enable": "false",
      "errors.tolerance": "none"
      }
}
Copy

독립 실행형 모드

구성 파일(예 <kafka_dir>/config/SF_connect.properties)을 생성합니다. 모든 커넥터 구성 정보로 파일을 채웁니다.

Sample configuration file

connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=none
Copy

테스트 및 프로토타이핑을 위한 캐시 고려 사항

커넥터는 파티션 재조정 중에 성능을 개선하기 위해 테이블 및 파이프 조회 검사를 캐시합니다. 그러나 테스트 및 프로토타이핑 중에 이 캐싱 동작으로 인해 커넥터가 수동으로 생성된 테이블 또는 파이프를 즉시 감지하지 못할 수 있습니다.

문제: 커넥터가 실행되는 동안 테이블이나 파이프를 수동으로 생성하면 커넥터가 기본적으로 최대 5분 동안 캐시된 존재 확인 결과(오브젝트가 존재하지 않음을 나타낼 수 있음)를 계속 사용할 수 있습니다. 이로 인해 테스트 중에 예기치 않은 오류나 동작이 발생할 수 있습니다.

테스트 권장 사항: 테스트 및 프로토타이핑 중에 캐시 관련 문제를 방지하려면 두 캐시 만료 매개 변수를 모두 최소값인 ``1``밀리초로 구성하거나 캐싱을 비활성화합니다.

snowflake.cache.table.exists.expire.ms=1
snowflake.cache.pipe.exists.expire.ms=1
Copy

이 구성을 사용하면 커넥터가 모든 파티션 재조정에 대해 새로운 존재 확인을 수행하여 수동으로 생성된 테이블과 파이프의 효과를 즉시 확인할 수 있습니다.

중요

이러한 최소 캐시 설정은 테스트 및 프로토타이핑용으로만 권장됩니다. 프로덕션 환경에서는 기본 캐시 만료 값(5분 이상)을 사용하여 Snowflake에 대한 메타데이터 쿼리를 최소화하고 특히 많은 파티션을 처리할 때 재조정 성능을 개선합니다.

커넥터 구성 속성

필수 속성

name

애플리케이션 이름입니다. 이 이름은 고객이 사용하는 모든 Kafka 커넥터에서 고유해야 합니다. 이 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 식별자에 대한 내용은 식별자 요구 사항 을 참조하십시오.

connector.class

com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector

topics

항목의 쉼표로 구분된 목록입니다. 기본적으로 Snowflake는 테이블 이름과 동일한 항목 이름을 가정합니다. 테이블 이름이 항목 이름과 동일하지 않은 경우 선택적 topic2table.map 매개 변수(아래)를 사용하여 항목 이름에서 테이블 이름으로 매핑을 지정합니다. 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 테이블 이름에 대한 내용은 식별자 요구 사항 을 참조하십시오.

참고

둘 다가 아닌 topics 또는 topics.regex1개 가 필수입니다.

topics.regex

Snowflake 테이블에 로드할 메시지가 포함된 항목을 지정하는 정규식(“regex”)입니다. 커넥터는 정규식과 일치하는 모든 항목 이름에서 데이터를 로드합니다. 정규식은 반드시 Java 정규식의 규칙을 따라야 합니다(즉, java.util.regex.Pattern과 호환되어야 함). 구성 파일에는 topics 또는 topics.regex1개 가 포함되어야 하며 둘 다 포함되지 않아야 합니다.

snowflake.url.name

Snowflake 계정에 액세스하기 위한 URL입니다. 이 URL에는 반드시 계정 식별자 가 포함되어야 합니다. 프로토콜(https://) 및 포트 번호는 선택 사항입니다.

snowflake.user.name

Snowflake 계정의 사용자 로그인 이름입니다.

snowflake.role.name

커넥터가 테이블에 데이터를 삽입하는 데 사용할 역할의 이름입니다.

snowflake.private.key

사용자 인증에서 사용되는 개인 키입니다. 키만 포함되어야 하며 헤더 또는 푸터는 포함되지 않아야 합니다. 키가 여러 라인으로 분할되는 경우에는 줄 바꿈을 제거합니다. 암호화되지 않은 키를 제공하거나 암호화된 키를 제공하고 snowflake.private.key.passphrase 매개 변수를 제공하여 Snowflake가 키의 암호를 해독할 수 있도록 할 수 있습니다. :emph:` 매개 변수 값이 암호화된 경우에만``snowflake.private.key` 이 매개 변수를 사용하십시오. :doc:`/user-guide/key-pair-auth`의 지침에 따라 암호화된 개인 키의 암호를 해독합니다.

참고

label-optional_properties`의 :samp:`snowflake.private.key.passphrase 섹션을 참조하세요.

snowflake.database.name

행을 삽입할 테이블이 포함된 데이터베이스의 이름입니다.

snowflake.schema.name

행을 삽입할 테이블이 포함된 스키마의 이름입니다.

header.converter

레코드가 Avro로 형식이 지정되고 헤더가 포함된 경우에만 필요합니다. 값은 "org.apache.kafka.connect.storage.StringConverter" 입니다.

key.converter

Kafka 레코드의 키 변환기(예: "org.apache.kafka.connect.storage.StringConverter")입니다. 이 속성은 Kafka 커넥터에서 사용되지 않지만 Kafka Connect 플랫폼에서는 필요합니다.

현재 제한 사항은 Kafka 커넥터 제한 사항 을 참조하십시오.

value.converter

커넥터는 표준 Kafka 커뮤니티 변환기를 지원합니다. 데이터 형식에 따라 적절한 변환기를 선택합니다.

  • JSON레코드의 경우: "org.apache.kafka.connect.json.JsonConverter"

  • Schema Registry가 있는 Avro 레코드의 경우: "io.confluent.connect.avro.AvroConverter"

현재 제한 사항은 Kafka 커넥터 제한 사항 을 참조하십시오.

선택적 속성

snowflake.private.key.passphrase

이 매개 변수의 값이 비어 있지 않은 경우 커넥터는 이 구문을 사용하여 개인 키의 암호를 해독하려고 시도합니다.

tasks.max

작업 수이며, 일반적으로 Kafka Connect 클러스터의 작업자 노드에 있는 CPU 코어 수와 동일합니다. 최상의 성능을 얻으려면 작업 수를 Kafka 파티션의 총 수와 동일하게 설정하되 CPU 코어 수를 초과하지 않도록 설정하는 것이 좋습니다. 작업 수가 많으면 메모리 사용량이 증가하고 재밸런싱이 자주 발생할 수 있습니다.

snowflake.topic2table.map

이 선택적 매개 변수를 사용하면 사용자가 테이블에 매핑할 항목을 지정할 수 있습니다. 각 항목과 테이블 이름은 콜론으로 구분해야 합니다(아래 예 참조). 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 테이블 이름에 대한 내용은 식별자 요구 사항 을 참조하십시오. 항목 구성에서는 topics.regex 를 사용할 때와 마찬가지로 정규식을 사용하여 항목을 정의할 수 있습니다. 정규 식은 모호할 수 없습니다. 일치하는 항목은 단 하나의 대상 테이블과만 일치해야 합니다.

예:

topics="topic1,topic2,topic5,topic6"
snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
Copy

다음과 같이 작성할 수 있습니다:

topics.regex="topic[0-9]"
snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
Copy
value.converter.schema.registry.url

형식이 Avro이고 Schema Registry Service를 사용하는 경우 이 속성은 Schema Registry Service의 URL이어야 합니다. 그렇지 않으면 이 필드를 비워둬야 합니다.

value.converter.break.on.schema.registry.error

스키마 레지스트리 서비스에서 Avro 데이터를 로드하는 경우 이 속성은 스키마 ID를 가져오는 동안 오류가 발생하면 Kafka 커넥터가 레코드 사용을 중지해야 하는지 여부를 결정합니다. 기본값은 false 입니다. 이 동작을 활성화하려면 값을 true 로 설정합니다.

jvm.proxy.host

Snowflake Kafka Connector가 프록시 서버를 통해 Snowflake에 액세스할 수 있도록 하려면 이 매개 변수를 설정하여 해당 프록시 서버의 호스트를 지정합니다.

jvm.proxy.port

Snowflake Kafka Connector가 프록시 서버를 통해 Snowflake에 액세스할 수 있도록 하려면 이 매개 변수를 설정하여 해당 프록시 서버의 포트를 지정합니다.

snowflake.streaming.max.client.lag

Specifies how often the connector flushes the data to Snowflake, in seconds.

:
  • 최소: 1

  • 최대: 600

기본값:

:code:`1`초

jvm.proxy.username

프록시 서버로 인증하는 사용자 이름입니다.

jvm.proxy.password

프록시 서버로 인증하는 사용자 이름의 비밀번호입니다.

snowflake.jdbc.map

예: "snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"

추가 JDBC 속성(JDBC 드라이버 연결 매개 변수 참조 참조)은 유효성이 검사되지 않습니다. 이러한 추가 속성은 유효성이 검사되지 않으며 필수 속성(예: jvm.proxy.xxx, snowflake.user.name, snowflake.private.key, snowflake.schema.name) 대신 재정의하거나 사용해서는 안 됩니다.

다음 조합 중 하나를 지정합니다.
  • JDBC_TRACE 환경 변수와 함께 tracing 속성

  • snowflake.database.name 와 함께 database 속성

모호한 동작이 발생하며 동작은 JDBC 드라이버에 의해 결정됩니다.

value.converter.basic.auth.credentials.source

Avro 데이터 타입을 사용 중이고 Kafka 스키마 레지스트리에 대한 보안 액세스가 필요한 경우 이 매개 변수를 “USER_INFO” 문자열로 설정하고 아래 설명된 value.converter.basic.auth.user.info 매개 변수를 설정합니다. 그렇지 않으면, 이 매개 변수를 생략합니다.

value.converter.basic.auth.user.info

Avro 데이터 타입을 사용 중이고 Kafka 스키마 레지스트리에 대한 보안 액세스가 필요한 경우 위의 설명과 같이 이 매개 변수를 “<사용자_ID>:<비밀번호>” 문자열로 설정하고 value.converter.basic.auth.credentials.source 매개 변수를 설정합니다. 그렇지 않으면, 이 매개 변수를 생략합니다.

snowflake.metadata.createtime

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서 CreateTime 속성 값이 생략됩니다. 기본값은 TRUE입니다.

snowflake.metadata.topic

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서 topic 속성 값이 생략됩니다. 기본값은 TRUE입니다.

snowflake.metadata.offset.and.partition

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서 OffsetPartition 속성 값이 생략됩니다. 기본값은 TRUE입니다.

snowflake.metadata.all

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터가 완전히 비어 있습니다. 기본값은 TRUE입니다.

transforms

Kafka 커넥터가 발견한 삭제 표시 레코드를 건너뛰고 대상 테이블에 로드하지 않도록 지정합니다. 삭제 표시 레코드는 전체 값 필드가 null인 레코드로 정의됩니다.

속성 값을 "tombstoneHandlerExample" 로 설정합니다.

참고

Kafka 커뮤니티 변환기(즉, value.converter 속성 값)에만 이 속성(예: org.apache.kafka.connect.json.JsonConverter 또는 org.apache.kafka.connect.json.AvroConverter)을 사용합니다. Snowflake 변환기로 삭제 표시 레코드 처리를 관리하려면 behavior.on.null.values 속성을 대신 사용합니다.

transforms.tombstoneHandlerExample.type

transforms 속성을 설정할 때 필수입니다.

속성 값을 "io.confluent.connect.transforms.TombstoneHandler" 로 설정합니다.

behavior.on.null.values

Kafka 커넥터가 삭제 표시 레코드를 처리하는 방법을 지정합니다. 삭제 표시 레코드는 전체 값 필드가 null인 레코드로 정의됩니다. Snowpipe 의 경우 이 속성은 Kafka 커넥터 버전 1.5.5 이상에서 지원됩니다. Snowpipe Streaming 의 경우 이 속성은 Kafka 커넥터 버전 2.1.0 이상에서 지원됩니다.

이 속성에서 지원되는 값은 다음과 같습니다.

DEFAULT

Kafka 커넥터에서 삭제 표시 레코드가 발생하면 내용 열에 빈 JSON 문자열이 삽입됩니다.

IGNORE

Kafka 커넥터가 삭제 표시 레코드를 건너뛰고 해당 레코드에 대한 행을 삽입하지 않습니다.

기본값은 DEFAULT 입니다.

참고

삭제 표시 레코드 수집은 수집 방법에 따라 다릅니다.

  • Snowpipe의 경우 Kafka 커넥터는 Snowflake 변환기만 사용합니다. Kafka 커뮤니티 변환기를 사용하여 삭제 표시 레코드 처리를 관리하려면 transformtransforms.tombstoneHandlerExample.type 속성을 대신 사용해야 합니다.

  • Snowpipe Streaming의 경우 Kafka 커넥터는 커뮤니티 변환기만 사용합니다.

Kafka 브로커에 전송된 레코드는 Kafka 커넥터에 의해 삭제되어 오프셋이 누락되므로 NULL이면 안 됩니다. 오프셋이 누락되면 특정 사용 사례에서 Kafka 커넥터가 손상됩니다. NULL 레코드 대신 삭제 표식 레코드를 사용하는 것이 좋습니다.

키 페어 인증 및 키 순환 사용하기

Kafka 커넥터는 사용자 이름 및 비밀번호 인증 대신 키 페어 인증을 사용합니다. 이 인증 메서드에는 2048비트(최소) RSA 키 페어가 필요합니다. OpenSSL을 사용하여 공개-개인 키 페어를 생성합니다. 공개 키는 구성 파일에 정의된 Snowflake 사용자에게 할당됩니다.

이 페이지의 키 페어 인증 작업과 키 페어 순환 작업을 완료한 후, 이 항목의 뒷부분에 있는 :ref:`label-kafkahp_externalize_secrets`에 대한 권장 사항을 평가합니다.

공개/개인 키 페어를 구성하려면:

  1. 터미널 창의 명령줄에서 개인 키를 생성합니다.

    개인 키의 암호화된 버전 또는 암호화되지 않은 버전을 생성할 수 있습니다.

    참고

    Kafka 커넥터는 연방 정보 처리 표준(140-2)(즉, FIPS 140-2) 요구 사항을 충족하는 것으로 검증된 암호화 알고리즘을 지원합니다. 자세한 내용은 FIPS 140-2 를 참조하십시오.

    암호화되지 않은 버전을 생성하려면 다음 명령을 사용합니다.

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    암호화된 버전을 생성하려면 다음 명령을 사용합니다.

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    여기서 <알고리즘> 은 FIPS 140-2 호환 암호화 알고리즘입니다.

    예를 들어, AES 256을 암호화 알고리즘으로 지정하려면:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    암호화된 버전의 개인 키를 생성하는 경우에는 암호 구문을 기록해 두십시오. 나중에 Kafka 구성 파일의 snowflake.private.key.passphrase 속성에 암호 구문을 지정합니다.

    샘플 PEM 개인 키

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. 명령줄에서 개인 키를 참조하여 공개 키를 생성합니다.

    개인 키가 암호화되고 이름이 rsa_key.p8 인 파일에 포함된 것으로 가정하고 다음 명령을 사용합니다.

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    샘플 PEM 공개 키

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. 공개 및 개인 키 파일을 로컬 디렉터리로 복사하여 저장합니다. 파일 경로를 기록해 둡니다. 개인 키는 PKCS#8(공개 키 암호화 표준) 형식을 사용하여 저장되며 이전 단계에서 지정한 암호 구문을 사용하여 암호화되지만, 파일은 운영 체제에서 제공하는 파일 메커니즘을 사용하여 무단 액세스로부터 보호되어야 합니다. 파일을 사용하지 않을 때 파일을 보호하는 것은 사용자의 책임입니다.

  4. Snowflake에 로그인합니다. ALTER USER 를 사용하여 Snowflake 사용자에게 공개 키를 할당합니다.

    For example:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    참고

    • 보안 관리자(즉, SECURITYADMIN 역할의 사용자) 이상만 사용자를 변경할 수 있습니다.

    • SQL 문에서 공개 키 헤더와 푸터를 제외합니다.

    DESCRIBE USER 를 사용하여 사용자의 공개 키 지문을 확인합니다.

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    참고

    RSA_PUBLIC_KEY_2_FP 속성에 대한 설명은 키-페어 순환 구성하기 에서 제공됩니다.

  5. 전체 개인 키를 복사하여 구성 파일의 snowflake.private.key 필드에 붙여넣습니다. 파일을 저장합니다.

시크릿 외부화하기

Snowflake는 개인 키와 같은 시크릿을 외부화하고 암호화된 형태로 저장하거나 AWS Key Management Service(KMS), Microsoft Azure Key Vault 또는 HashiCorp Vault와 같은 키 관리 서비스에 저장할 것을 적극 권장합니다. 이러한 작업은 Kafka Connect 클러스터에서 ConfigProvider 구현을 사용하여 수행할 수 있습니다.

자세한 내용은 이 서비스 에 대한 Confluent 설명을 참조하십시오.

Starting the connector

서드 파티 Confluent 또는 Apache Kafka 설명서에서 제공되는 지침을 사용하여 Kafka를 시작합니다. Kafka 커넥터는 분산형 모드 또는 독립 실행형 모드로 시작할 수 있습니다. 각 모드에 대한 지침은 아래와 같습니다.

분산형 모드

터미널 창에서 다음 명령을 실행합니다.

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

독립 실행형 모드

터미널 창에서 다음 명령을 실행합니다.

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

참고

(Apache Kafka 또는 Confluent Kafka의 기본 설치에는 이미 connect-standalone.properties 파일이 포함되어 있어야 합니다.)

다음 단계

커넥터 테스트.