Kafka 커넥터 설치 및 구성하기

Kafka 커넥터는 JAR(Java 실행 가능) 파일로 제공됩니다.

Snowflake는 두 가지 커넥터 버전을 제공합니다.

참고

Kafka 커넥터에는 커넥터 약관 이 적용됩니다.

이 항목의 지침에서는 커넥터의 두 버전 중 1개에만 적용되는 단계를 지정합니다.

이 항목의 내용:

Snowflake 오브젝트용 액세스 제어 구성하기

필수 권한

Kafka 커넥터에서 사용되는 Snowflake 오브젝트를 생성 및 관리하기 위해서는 다음 필수 권한이 있는 역할이 필요합니다.

오브젝트

권한

참고

데이터베이스

USAGE

스키마

USAGE . CREATE TABLE . CREATE STAGE . CREATE PIPE

스키마 수준 오브젝트가 생성된 후 CREATE object 권한이 취소될 수 있습니다.

테이블

OWNERSHIP

Kafka 커넥터를 사용하여 기존 테이블에 데이터를 수집할 때만 필요합니다. . 커넥터가 Kafka 항목의 레코드에 대해 새 대상 테이블을 생성하는 경우 Kafka 구성 파일에 지정된 사용자의 기본 역할이 테이블 소유자(즉, 테이블에 대한 OWNERSHIP 권한을 가짐)가 됩니다.

스테이지

READ . WRITE

Kafka 커넥터를 사용하여 Kafka에서 기존 내부 스테이지로 데이터 파일을 스테이징할 때만 필요합니다(권장하지 않음). . 커넥터가 Kafka 항목에서 사용된 데이터 파일을 임시로 저장하기 위해 새 스테이지를 생성하는 경우 Kafka 구성 파일에 지정된 사용자의 기본 역할이 스테이지 소유자(즉, 스테이지에 대한 OWNERSHIP 권한을 가짐)가 됩니다.

Snowflake는 각 Kafka 인스턴스에 대해 별도의 사용자(CREATE USER 사용) 및 역할(CREATE ROLE 사용)을 생성하여 필요한 경우 액세스 권한을 개별적으로 취소할 수 있도록 하는 것을 권장합니다. 역할은 사용자에 대한 기본 역할로 할당되어야 합니다.

Kafka 커넥터를 사용하기 위한 역할 생성하기

Kafka 커넥터(예: KAFKA_CONNECTOR_ROLE_1)에서 사용하기 위한 사용자 지정 역할을 생성하는 스크립트는 다음과 같습니다. 권한을 부여할 수 있는 모든 역할(예: SECURITYADMIN 또는 MANAGE GRANTS 권한이 있는 모든 역할)은 이러한 사용자 지정 역할을 모든 사용자에게 부여하여 Kafka 커넥터가 필요한 Snowflake 오브젝트를 생성하고 테이블에 데이터를 삽입할 수 있도록 할 수 있습니다. 이 스크립트에서는 특정한 기존 데이터베이스 및 스키마(kafka_db.kafka_schema) 및 사용자(kafka_connector_user_1)를 참조합니다.

-- Use a role that can create and manage roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake role with the privileges to work with the connector.
CREATE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will load data into an existing table.
GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;

-- Only required if the Kafka connector will stage data files in an existing internal stage: (not recommended).
GRANT READ, WRITE ON STAGE existing_stage1 TO ROLE kafka_connector_role_1;

-- Grant the custom role to an existing user.
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;

-- Set the custom role as the default role for the user.
-- If you encounter an 'Insufficient privileges' error, verify the role that has the OWNERSHIP privilege on the user.
ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
Copy

사용자 지정 역할 및 역할 계층 구조 생성과 관련한 자세한 내용은 액세스 제어 구성하기 를 참조하십시오.

설치 전제 조건

  • Kafka 커넥터에서 지원되는 패키지 버전은 다음과 같습니다.

    패키지

    Snowflake Kafka Connector 버전

    패키지 지원(Snowflake에서 테스트됨)

    Apache Kafka

    2.0.0 이상

    Apache Kafka 2.5.1, 2.8.1, 3.2.1

    Confluent

    2.0.0 이상

    Confluent 6.2.6, 7.2.1

  • Kafka 커넥터는 Kafka Connect API 3.2.3과 함께 사용하도록 개발되었습니다. Kafka Connect API의 최신 버전은 하나도 테스트되지 않았습니다. 3.2.3보다 오래된 모든 버전은 커넥터와 호환됩니다. 자세한 내용은 Kafka 호환성 을 참조하십시오.

  • 사용자 환경에 Kafka 커넥터와 JDBC 드라이버 jar 파일이 모두 있는 경우 JDBC 버전이 원하는 Kafka 커넥터 버전의 pom.xml 파일에 지정된 snowflake-jdbc 버전과 일치하는지 확인하십시오. 원하는 Kafka 커넥터 출시 버전(예: v2.0.1)으로 이동할 수 있습니다. 그런 다음 pom.xml 파일을 탐색하여 snowflake-jdbc 버전을 확인하십시오.

  • Avro 형식을 사용하여 데이터를 수집하는 경우:

    • Avro 파서 버전 1.8.2 이상을 사용해야 하며, 파서는 .https://mvnrepository.com/artifact/org.apache.avro에서 제공됩니다.

    • Avro와 함께 스키마 레지스트리 기능을 사용하는 경우 Kafka Connect Avro Converter 버전 5.0.0 이상을 사용해야 하며, https://mvnrepository.com/artifact/io.confluent에서 제공됩니다.

      OSS Apache Kafka 패키지에서는 스키마 레지스트리 기능을 사용할 수 없습니다.

  • 원하는 데이터 보존 시간 및/또는 저장소 제한을 적용하여 Kafka를 구성합니다.

  • Kafka Connect 클러스터를 설치 및 구성합니다.

    각 Kafka Connect 클러스터 노드에는 Kafka 커넥터에서 사용할 충분한 RAM이 포함되어야 합니다. 최소 권장 용량은 Kafka 파티션당 5 MB입니다. 이러한 용량은 Kafka Connect가 수행하는 다른 작업에 필요한 RAM에 추가됩니다.

  • Kafka Broker 및 Kafka Connect Runtime에서 동일한 버전을 사용하는 것이 좋습니다.

  • Snowflake는 Snowflake 계정과 동일한 클라우드 공급자 리전 에서 Kafka Connect 인스턴스를 실행하는 것이 적극 권장합니다. 이는 필수가 아니지만, 일반적으로 처리량을 향상할 수 있습니다.

Snowflake 클라이언트에서 지원되는 운영 체제 목록은 운영 체제 지원 을 참조하십시오.

커넥터 설치하기

이 섹션에서는 Confluent용 Kafka 커넥터 설치 및 구성에 대한 지침을 제공합니다. Kafka 커넥터 버전은 다음 표를 참조하십시오.

릴리스 시리즈

상태

참고

2.x.x

공식적으로 지원됨

최신 버전이며 적극 권장합니다.

1.9.x

공식적으로 지원됨

업그레이드를 권장합니다.

1.8.x

지원되지 않음

이 릴리스 시리즈는 사용하지 마십시오.

1.7.x

지원되지 않음

이 릴리스 시리즈는 사용하지 마십시오.

Confluent용 커넥터 설치하기

Kafka 커넥터 파일 다운로드하기

다음 위치 중 하나에서 Kafka 커넥터 JAR 파일을 다운로드합니다.

Confluent Hub:

https://www.confluent.io/hub/

패키지에는 암호화되거나 암호화되지 않은 개인 키를 키 페어 인증에서 사용하기 위해 필요한 모든 종속성이 포함됩니다. 자세한 내용은 이 항목의 키 페어 인증 & 키 순환 사용하기 섹션을 참조하십시오.

Maven Central Repository:

https://mvnrepository.com/artifact/com.snowflake

키 페어 인증을 위해 암호화되지 않은 개인 키를 사용하기 위해서는 JAR 파일에 추가 종속성이 필요하지 않습니다. 암호화된 개인 키를 사용하려면 Bouncy Castle 암호화 라이브러리(JAR 파일)를 다운로드하십시오. Snowflake는 Bouncy Castle을 사용하여 로그인에 사용되는 암호화된 RSA 개인 키의 암호를 해독합니다.

이러한 파일을 Kafka 커넥터 JAR 파일과 동일한 로컬 폴더에 다운로드합니다.

커넥터용 소스 코드는 https://github.com/snowflakedb/snowflake-kafka-connector에서 제공됩니다.

Kafka 커넥터 설치하기

다른 커넥터의 설치를 위해 제공되는 지침을 사용하여 Kafka 커넥터를 설치합니다.

오픈 소스 Apache Kafka용 커넥터 설치하기

이 섹션에서는 오픈 소스 Apache Kafka용 Kafka 커넥터를 설치 및 구성하기 위한 지침을 제공합니다.

Apache Kafka 설치

  1. Kafka 패키지를 공식 웹 사이트(https://kafka.apache.org/downloads)에서 다운로드합니다.

  2. 터미널 창에서 패키지 파일을 다운로드한 디렉터리를 변경합니다.

  3. 다음 명령을 실행하여 kafka_<scala_버전>-<kafka_버전>.tgz 파일의 압축을 풉니다.

    tar xzvf kafka_<scala_version>-<kafka_version>.tgz
    
    Copy

JDK 설치

Java 개발 키트(JDK)를 설치 및 구성합니다. Snowflake는 JDK의 Standard Edition(SE)에 대한 테스트를 완료했습니다. Enterprise Edition(EE)은 호환될 것으로 예상되지만, 테스트가 수행되지 않았습니다.

이 단계를 이미 완료한 경우에는 이 섹션을 건너뛸 수 있습니다.

  1. https://www.oracle.com/technetwork/java/javase/downloads/index.html에서 JDK를 다운로드합니다.

  2. JDK를 설치하거나 압축을 풉니다.

  3. 사용 중인 운영 체제의 지침에 따라 JDK가 포함된 디렉터리를 가리키도록 환경 변수 JAVA_HOME을 설정합니다.

Kafka 커넥터 JAR 파일 다운로드하기

  1. Maven Central Repository에서 Kafka 커넥터 JAR 파일 다운로드:

    https://mvnrepository.com/artifact/com.snowflake

  2. 키 페어 인증을 위해 암호화되지 않은 개인 키를 사용하기 위해서는 JAR 파일에 추가 종속성이 필요하지 않습니다. 암호화된 개인 키를 사용하려면 Bouncy Castle 암호화 라이브러리(JAR 파일)를 다운로드하십시오. Snowflake는 Bouncy Castle을 사용하여 로그인에 사용되는 암호화된 RSA 개인 키의 암호를 해독합니다.

  3. Kafka 데이터가 Apache Avro 형식으로 스트리밍되는 경우에는 Avro JAR 파일 다운로드:

    https://mvnrepository.com/artifact/org.apache.avro/avro

커넥터용 소스 코드는 https://github.com/snowflakedb/snowflake-kafka-connector에서 제공됩니다.

Kafka 커넥터 설치하기

Kafka Connector JAR 파일 다운로드 에서 다운로드한 JAR 파일을 <kafka_dir>/libs 폴더로 복사합니다.

Kafka 커넥터 구성하기

커넥터는 Snowflake 로그인 자격 증명, 항목 이름, Snowflake 테이블 이름 등의 매개 변수를 지정하는 파일을 생성하여 구성됩니다.

중요

Kafka Connect 프레임워크는 Kafka 커넥터에 대한 구성 설정을 마스터 노드에서 작업자 노드로 브로드캐스트합니다. 구성 설정에는 민감한 정보(특히 Snowflake 사용자 이름 및 개인 키)가 포함됩니다. Kafka Connect 노드 사이의 통신 채널이 보호되어야 함에 유의하십시오. 이와 관련한 지침은 Apache Kafka 소프트웨어 설명서를 참조하십시오.

각 구성 파일은 1개의 데이터베이스와 해당 데이터베이스에서 1개의 스키마에 대한 항목 및 해당 테이블을 지정합니다. 커넥터는 여러 항목에서 메시지를 수집할 수 있지만, 해당 테이블은 모두 단일 데이터베이스 및 스키마에 저장되어야 합니다.

이 섹션에서는 분산형 및 독립 실행형 모드에 대한 지침을 제공합니다.

구성 필드에 대한 설명은 Kafka 구성 속성 을 참조하십시오.

중요

일반적으로 구성 파일에는 개인 키와 같은 보안 관련 정보가 포함되어 있으므로 파일에 대한 읽기/쓰기 권한을 적절하게 설정하여 액세스를 제한해야 합니다.

또한, 구성 파일을 안전한 외부 위치 또는 키 관리 서비스에 저장하는 것을 고려하십시오. 자세한 내용은 이 항목의 시크릿 외부화하기 섹션을 참조하십시오.

분산형 모드

Kafka 구성 파일(예: <경로>/<구성_파일>.json)을 생성합니다. 모든 커넥터 구성 정보로 파일을 채웁니다. 파일은 JSON 형식이어야 합니다.

샘플 구성 파일

{
  "name":"XYZCompanySensorData",
  "config":{
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"topic1,topic2",
    "snowflake.topic2table.map": "topic1:table1,topic2:table2",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"myorganization-myaccount.snowflakecomputing.com:443",
    "snowflake.user.name":"jane.smith",
    "snowflake.private.key":"xyz123",
    "snowflake.private.key.passphrase":"jkladu098jfd089adsq4r",
    "snowflake.database.name":"mydb",
    "snowflake.schema.name":"myschema",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"com.snowflake.kafka.connector.records.SnowflakeAvroConverter",
    "value.converter.schema.registry.url":"http://localhost:8081",
    "value.converter.basic.auth.credentials.source":"USER_INFO",
    "value.converter.basic.auth.user.info":"jane.smith:MyStrongPassword"
  }
}
Copy

독립 실행형 모드

구성 파일(예: <kafka_dir>/config/SF_connect.properties)을 생성합니다. 모든 커넥터 구성 정보로 파일을 채웁니다.

샘플 구성 파일

connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=topic1,topic2
snowflake.topic2table.map= topic1:table1,topic2:table2
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.user.name=jane.smith
snowflake.private.key=xyz123
snowflake.private.key.passphrase=jkladu098jfd089adsq4r
snowflake.database.name=mydb
snowflake.schema.name=myschema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeAvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.basic.auth.user.info=jane.smith:MyStrongPassword
Copy

Kafka 구성 속성

Kafka 구성 파일에서 분산형 모드 또는 독립 실행형 모드에 대해 설정할 수 있는 속성은 다음과 같습니다.

필수 속성

name

애플리케이션 이름입니다. 이 이름은 고객이 사용하는 모든 Kafka 커넥터에서 고유해야 합니다. 이 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 식별자에 대한 내용은 식별자 요구 사항 을 참조하십시오.

connector.class

com.snowflake.kafka.connector.SnowflakeSinkConnector

topics

항목의 쉼표로 구분된 목록입니다. 기본적으로 Snowflake는 테이블 이름과 동일한 항목 이름을 가정합니다. 테이블 이름이 항목 이름과 동일하지 않은 경우 선택적 topic2table.map 매개 변수(아래)를 사용하여 항목 이름에서 테이블 이름으로 매핑을 지정합니다. 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 테이블 이름에 대한 내용은 식별자 요구 사항 을 참조하십시오.

참고

둘 다가 아닌 topics 또는 topics.regex1개 가 필수입니다.

topics.regex

Snowflake 테이블에 로드할 메시지가 포함된 항목을 지정하는 정규식(“regex”)입니다. 커넥터는 정규식과 일치하는 모든 항목 이름에서 데이터를 로드합니다. 정규식은 반드시 Java 정규식의 규칙을 따라야 합니다(즉, java.util.regex.Pattern과 호환되어야 함). 구성 파일에는 topics 또는 topics.regex1개 가 포함되어야 하며 둘 다 포함되지 않아야 합니다.

snowflake.url.name

Snowflake 계정에 액세스하기 위한 URL입니다. 이 URL에는 반드시 계정 식별자 가 포함되어야 합니다. 프로토콜(https://) 및 포트 번호는 선택 사항입니다.

snowflake.user.name

Snowflake 계정의 사용자 로그인 이름입니다.

snowflake.private.key

사용자 인증에서 사용되는 개인 키입니다. 키만 포함되어야 하며 헤더 또는 푸터는 포함되지 않아야 합니다. 키가 여러 라인으로 분할되는 경우에는 줄 바꿈을 제거합니다. 암호화되지 않은 키를 제공하거나 암호화된 키를 제공하고 snowflake.private.key.passphrase 매개 변수를 제공하여 Snowflake가 키의 암호를 해독할 수 있도록 할 수 있습니다. snowflake.private.key 매개 변수 값이 암호화된 경우에만 이 매개 변수를 사용하십시오. 이를 통해 이 항목에서 제공되는 키 페어 인증 및 키 순환 사용하기 의 지침에 따라 암호화된 개인 키의 암호화를 해제할 수 있습니다.

참고

이 항목의 선택 사항 속성 에서 제공되는 snowflake.private.key.passphrase 도 참조하십시오.

snowflake.database.name

행을 삽입할 테이블이 포함된 데이터베이스의 이름입니다.

snowflake.schema.name

행을 삽입할 테이블이 포함된 스키마의 이름입니다.

header.converter

레코드가 Avro로 형식이 지정되고 헤더가 포함된 경우에만 필요합니다. 값은 "org.apache.kafka.connect.storage.StringConverter" 입니다.

key.converter

Kafka 레코드의 키 변환기(예: "org.apache.kafka.connect.storage.StringConverter")입니다. 이 속성은 Kafka 커넥터에서 사용되지 않지만 Kafka Connect 플랫폼에서는 필요합니다.

현재 제한 사항은 Kafka 커넥터 제한 사항 을 참조하십시오.

value.converter

레코드의 형식이 JSON인 경우, 이 속성은 "com.snowflake.kafka.connector.records.SnowflakeJsonConverter" 여야 합니다.

레코드의 형식이 Avro이고 Kafka의 Schema Registry Service를 사용하는 경우에는 "com.snowflake.kafka.connector.records.SnowflakeAvroConverter" 여야 합니다.

레코드의 형식이 Avro이고 스키마를 포함(그러므로 Kafka의 Schema Registry Service가 필요하지 않음)하는 경우에는 "com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry" 여야 합니다.

레코드의 형식이 일반 텍스트인 경우에는 "org.apache.kafka.connect.storage.StringConverter" 여야 합니다.

현재 제한 사항은 Kafka 커넥터 제한 사항 을 참조하십시오.

선택적 속성

snowflake.private.key.passphrase

이 매개 변수의 값이 비어 있지 않으면 Kafka는 이 구문을 사용하여 개인 키의 암호를 해독하려고 시도합니다.

tasks.max

작업 수이며, 일반적으로 Kafka Connect 클러스터의 작업자 노드에 있는 CPU 코어 수와 동일합니다. 이 숫자를 증가하거나 줄일 수 있지만, Snowflake는 더 높게 설정하지 않는 것을 권장합니다.

snowflake.topic2table.map

이 선택적 매개 변수를 사용하면 사용자가 테이블에 매핑할 항목을 지정할 수 있습니다. 각 항목과 테이블 이름은 콜론으로 구분해야 합니다(아래 예 참조). 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 테이블 이름에 대한 내용은 식별자 요구 사항 을 참조하십시오.

buffer.count.records

Snowflake로 수집하기 전에 Kafka 파티션당 메모리에 버퍼링된 레코드 수입니다. 기본값은 10000 개 레코드입니다.

buffer.flush.time

Kafka의 메모리 캐시에서 내부 스테이지로 플러시되는 버퍼 플러시 사이의 시간(초)입니다. 기본값은 120 초입니다.

buffer.size.bytes

Snowflake에서 데이터 파일로 수집되기 전에 Kafka 파티션별로 메모리에 버퍼링된 레코드의 누적 크기(바이트)입니다. 기본값은 5000000 (5 MB)입니다.

레코드는 데이터 파일에 기록될 때 압축됩니다. 그 결과, 버퍼에 있는 레코드의 크기가 레코드에서 생성된 데이터 파일의 크기보다 클 수 있습니다.

value.converter.schema.registry.url

형식이 Avro이고 Schema Registry Service를 사용하는 경우 이 속성은 Schema Registry Service의 URL이어야 합니다. 그렇지 않으면 이 필드를 비워둬야 합니다.

value.converter.break.on.schema.registry.error

스키마 레지스트리 서비스에서 Avro 데이터를 로드하는 경우 이 속성은 스키마 ID를 가져오는 동안 오류가 발생하면 Kafka 커넥터가 레코드 사용을 중지해야 하는지 여부를 결정합니다. 기본값은 false 입니다. 이 동작을 활성화하려면 값을 true 로 설정합니다.

Kafka 커넥터 버전 1.4.2 이상에서 지원됩니다.

jvm.proxy.host

Snowflake Kafka Connector가 프록시 서버를 통해 Snowflake에 액세스할 수 있도록 하려면 이 매개 변수를 설정하여 해당 프록시 서버의 호스트를 지정합니다.

jvm.proxy.port

Snowflake Kafka Connector가 프록시 서버를 통해 Snowflake에 액세스할 수 있도록 하려면 이 매개 변수를 설정하여 해당 프록시 서버의 포트를 지정합니다.

jvm.proxy.username

프록시 서버로 인증하는 사용자 이름입니다.

Kafka 커넥터 버전 1.4.4 이상에서 지원됩니다.

jvm.proxy.password

프록시 서버로 인증하는 사용자 이름의 비밀번호입니다.

Kafka 커넥터 버전 1.4.4 이상에서 지원됩니다.

value.converter.basic.auth.credentials.source

Avro 데이터 타입을 사용 중이고 Kafka 스키마 레지스트리에 대한 보안 액세스가 필요한 경우 이 매개 변수를 “USER_INFO” 문자열로 설정하고 아래 설명된 value.converter.basic.auth.user.info 매개 변수를 설정합니다. 그렇지 않으면, 이 매개 변수를 생략합니다.

value.converter.basic.auth.user.info

Avro 데이터 타입을 사용 중이고 Kafka 스키마 레지스트리에 대한 보안 액세스가 필요한 경우 위의 설명과 같이 이 매개 변수를 “<사용자_ID>:<비밀번호>” 문자열로 설정하고 value.converter.basic.auth.credentials.source 매개 변수를 설정합니다. 그렇지 않으면, 이 매개 변수를 생략합니다.

snowflake.metadata.createtime

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서 CreateTime 속성 값이 생략됩니다. 기본값은 TRUE입니다.

Kafka 커넥터 1.2.0 이상에서 지원됩니다.

snowflake.metadata.topic

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서 topic 속성 값이 생략됩니다. 기본값은 TRUE입니다.

Kafka 커넥터 1.2.0 이상에서 지원됩니다.

snowflake.metadata.offset.and.partition

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서 OffsetPartition 속성 값이 생략됩니다. 기본값은 TRUE입니다.

Kafka 커넥터 1.2.0 이상에서 지원됩니다.

snowflake.metadata.all

값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터가 완전히 비어 있습니다. 기본값은 TRUE입니다.

Kafka 커넥터 1.2.0 이상에서 지원됩니다.

transforms

Kafka 커넥터가 발견한 삭제 표시 레코드를 건너뛰고 대상 테이블에 로드하지 않도록 지정합니다. 삭제 표시 레코드는 전체 값 필드가 null인 레코드로 정의됩니다.

속성 값을 "tombstoneHandlerExample" 로 설정합니다.

참고

Kafka 커뮤니티 변환기(즉, value.converter 속성 값)에만 이 속성(예: org.apache.kafka.connect.json.JsonConverter 또는 org.apache.kafka.connect.json.AvroConverter)을 사용합니다. Snowflake 변환기로 삭제 표시 레코드 처리를 관리하려면 behavior.on.null.values 속성을 대신 사용합니다.

transforms.tombstoneHandlerExample.type

transforms 속성을 설정할 때 필수입니다.

속성 값을 "io.confluent.connect.transforms.TombstoneHandler" 로 설정합니다.

behavior.on.null.values

Kafka 커넥터가 삭제 표시 레코드를 처리하는 방법을 지정합니다. 삭제 표시 레코드는 전체 값 필드가 null인 레코드로 정의됩니다. Snowpipe 의 경우 이 속성은 Kafka 커넥터 버전 1.5.5 이상에서 지원됩니다. Snowpipe Streaming 의 경우 이 속성은 Kafka 커넥터 버전 2.1.0 이상에서 지원됩니다.

이 속성에서 지원되는 값은 다음과 같습니다.

DEFAULT

Kafka 커넥터에서 삭제 표시 레코드가 발생하면 내용 열에 빈 JSON 문자열이 삽입됩니다.

IGNORE

Kafka 커넥터가 삭제 표시 레코드를 건너뛰고 해당 레코드에 대한 행을 삽입하지 않습니다.

기본값은 DEFAULT 입니다.

참고

삭제 표시 레코드 수집은 수집 방법에 따라 다릅니다.

  • Snowpipe의 경우 Kafka 커넥터는 Snowflake 변환기만 사용합니다. Kafka 커뮤니티 변환기를 사용하여 삭제 표시 레코드 처리를 관리하려면 transformtransforms.tombstoneHandlerExample.type 속성을 대신 사용해야 합니다.

  • Snowpipe Streaming의 경우 Kafka 커넥터는 커뮤니티 변환기만 사용합니다.

Kafka 브로커에 전송된 레코드는 Kafka 커넥터에 의해 삭제되어 오프셋이 누락되므로 NULL이면 안 됩니다. 오프셋이 누락되면 특정 사용 사례에서 Kafka 커넥터가 손상됩니다. NULL 레코드 대신 삭제 표식 레코드를 사용하는 것이 좋습니다.

키 페어 인증 및 키 순환 사용하기

Kafka 커넥터에서는 기본 인증(즉, 사용자 이름 및 비밀번호)이 아닌 키 페어 인증을 사용합니다. 이 인증 메서드에는 2048비트(최소) RSA 키 페어가 필요합니다. OpenSSL을 사용하여 공개-개인 키 페어를 생성합니다. 공개 키는 구성 파일에 정의된 Snowflake 사용자에게 할당됩니다.

이 페이지의 키 페어 인증 지침과 키 페어 순환 지침을 완료한 후, 이 항목의 시크릿 외부화하기 에 대한 권장 사항을 평가합니다.

공개/개인 키 페어를 구성하려면:

  1. 터미널 창의 명령줄에서 개인 키를 생성합니다.

    개인 키의 암호화된 버전 또는 암호화되지 않은 버전을 생성할 수 있습니다.

    참고

    Kafka 커넥터는 연방 정보 처리 표준(140-2)(즉, FIPS 140-2) 요구 사항을 충족하는 것으로 검증된 암호화 알고리즘을 지원합니다. 자세한 내용은 FIPS 140-2 를 참조하십시오.

    암호화되지 않은 버전을 생성하려면 다음 명령을 사용합니다.

    $ openssl genrsa -out rsa_key.pem 2048
    
    Copy

    암호화된 버전을 생성하려면 다음 명령을 사용합니다.

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 <algorithm> -inform PEM -out rsa_key.p8
    
    Copy

    여기서 <알고리즘> 은 FIPS 140-2 호환 암호화 알고리즘입니다.

    예를 들어, AES 256을 암호화 알고리즘으로 지정하려면:

    $ openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out rsa_key.p8
    
    Copy

    암호화된 버전의 개인 키를 생성하는 경우에는 암호 구문을 기록해 두십시오. 나중에 Kafka 구성 파일의 snowflake.private.key.passphrase 속성에 암호 구문을 지정합니다.

    샘플 PEM 개인 키

    -----BEGIN ENCRYPTED PRIVATE KEY-----
    MIIE6TAbBgkqhkiG9w0BBQMwDgQILYPyCppzOwECAggABIIEyLiGSpeeGSe3xHP1
    wHLjfCYycUPennlX2bd8yX8xOxGSGfvB+99+PmSlex0FmY9ov1J8H1H9Y3lMWXbL
    ...
    -----END ENCRYPTED PRIVATE KEY-----
    
    Copy
  2. 명령줄에서 개인 키를 참조하여 공개 키를 생성합니다.

    개인 키가 암호화되고 이름이 rsa_key.p8 인 파일에 포함된 것으로 가정하고 다음 명령을 사용합니다.

    $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
    
    Copy

    샘플 PEM 공개 키

    -----BEGIN PUBLIC KEY-----
    MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAy+Fw2qv4Roud3l6tjPH4
    zxybHjmZ5rhtCz9jppCV8UTWvEXxa88IGRIHbJ/PwKW/mR8LXdfI7l/9vCMXX4mk
    ...
    -----END PUBLIC KEY-----
    
    Copy
  3. 공개 및 개인 키 파일을 로컬 디렉터리로 복사하여 저장합니다. 파일 경로를 기록해 둡니다. 개인 키는 PKCS#8(공개 키 암호화 표준) 형식을 사용하여 저장되며 이전 스테이지에서 지정한 암호 구문을 사용하여 암호화되지만, 파일은 운영 체제에서 제공하는 파일 권한 방식을 사용하여 무단 액세스로부터 보호되어야 한다는 점에 유의하십시오. 파일을 사용하지 않을 때 파일을 보호하는 것은 사용자의 책임입니다.

  4. Snowflake에 로그인합니다. ALTER USER 를 사용하여 Snowflake 사용자에게 공개 키를 할당합니다. 예:

    ALTER USER jsmith SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';
    
    Copy

    참고

    • 보안 관리자(즉, SECURITYADMIN 역할의 사용자) 이상만 사용자를 변경할 수 있습니다.

    • SQL 문에서 공개 키 헤더와 푸터를 제외합니다.

    DESCRIBE USER 를 사용하여 사용자의 공개 키 지문을 확인합니다.

    DESC USER jsmith;
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    | property                      | value                                               | default | description                                                                   |
    |-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------|
    | NAME                          | JSMITH                                              | null    | Name                                                                          |
    ...
    ...
    | RSA_PUBLIC_KEY_FP             | SHA256:nvnONUsfiuycCLMXIEWG4eTp4FjhVUZQUQbNpbSHXiA= | null    | Fingerprint of user's RSA public key.                                         |
    | RSA_PUBLIC_KEY_2_FP           | null                                                | null    | Fingerprint of user's second RSA public key.                                  |
    +-------------------------------+-----------------------------------------------------+---------+-------------------------------------------------------------------------------+
    
    Copy

    참고

    RSA_PUBLIC_KEY_2_FP 속성에 대한 설명은 키-페어 순환 구성하기 에서 제공됩니다.

  5. 전체 개인 키를 복사하여 구성 파일의 snowflake.private.key 필드에 붙여넣습니다. 파일을 저장합니다.

시크릿 외부화하기

Snowflake는 개인 키와 같은 시크릿을 외부화하고 암호화된 형태로 저장하거나 AWS Key Management Service(KMS), Microsoft Azure Key Vault 또는 HashiCorp Vault와 같은 키 관리 서비스에 저장할 것을 적극 권장합니다. 이러한 작업은 Kafka Connect 클러스터에서 ConfigProvider 구현을 사용하여 수행할 수 있습니다.

자세한 내용은 이 서비스 에 대한 Confluent 설명을 참조하십시오.

Kafka 시작하기

서드 파티 Confluent 또는 Apache Kafka 설명서에서 제공되는 지침을 사용하여 Kafka를 시작합니다.

Kafka 커넥터 시작하기

Kafka 커넥터는 분산형 모드 또는 독립 실행형 모드로 시작할 수 있습니다. 각 모드에 대한 지침은 아래와 같습니다.

분산형 모드

터미널 창에서 다음 명령을 실행합니다.

curl -X POST -H "Content-Type: application/json" --data @<path>/<config_file>.json http://localhost:8083/connectors
Copy

독립 실행형 모드

터미널 창에서 다음 명령을 실행합니다.

<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/<path>/connect-standalone.properties <kafka_dir>/config/SF_connect.properties
Copy

(Apache Kafka 또는 Confluent Kafka의 기본 설치에는 이미 connect-standalone.properties 파일이 포함되어 있어야 합니다.)

Kafka 커넥터 테스트 및 사용하기

소량의 데이터로 Kafka 커넥터를 테스트한 후 프로덕션 시스템에서 커넥터를 사용하는 것이 좋습니다. 테스트 과정은 커넥터를 정상적으로 사용하는 과정과 동일합니다.

  1. Kafka 및 Kafka Connect가 실행 중인지 확인합니다.

  2. 적절한 Kafka 항목을 생성했는지 확인합니다.

  3. 메시지 게시자를 생성하거나 기존 메시지 게시자를 사용합니다. 항목에 게시된 메시지의 형식(JSON, Avro 또는 일반 텍스트)이 올바른지 확인합니다.

  4. 구독할 항목 및 작성할 Snowflake 테이블을 지정하는 구성 파일을 생성합니다. 지침은 이 항목의 Kafka Connector 구성하기 를 참조하십시오.

  5. (선택 사항) 데이터를 쓸 테이블을 생성합니다. 이 단계는 선택 사항이며, 사용자가 테이블을 생성하지 않으면 Kafka 커넥터가 테이블을 생성합니다. 커넥터를 사용하여 비어 있지 않은 기존 테이블에 데이터를 추가하지 않으려면, 스키마가 불일치하게 될 가능성을 최소화하기 위해 커넥터가 테이블을 생성하도록 하는 것이 좋습니다.

  6. Snowflake 오브젝트(데이터베이스, 스키마, 대상 테이블 등)에 필요한 최소 권한을 데이터 수집에 사용할 역할에 부여합니다.

  7. 구성된 Kafka 항목에 샘플 데이터 세트를 게시합니다.

  8. 데이터가 시스템을 통해 전파될 때까지 몇 분을 기다린 후 Snowflake 테이블에 레코드가 삽입되었는지 확인합니다.

SnowCD 를 사용하여 Snowflake에 대한 네트워크 연결을 확인한 후 테스트 및 프로덕션 환경에서 Snowflake에 데이터를 로드하는 것이 좋습니다.