Snowflake High Performance connector for Kafka: Kafka 구성¶
이 항목에서는 |KAFKAOFHP|용 Kafka를 설치하고 구성하는 단계에 대해 설명합니다.
Kafka 커넥터 설치하기¶
Kafka 커넥터는 JAR(Java 실행 가능) 파일로 제공됩니다.
Snowflake는 두 가지 커넥터 버전을 제공합니다.
Kafka Connect의 Confluent 구현을 위한 버전.
|oss_kafka_link|의 버전.
두 버전의 커넥터 모두 Snowflake Private Preview에서 사용할 수 있으며 Snowflake에서 다운로드해야 합니다. 커넥터 JAR 파일을 다운로드하려면 Snowflake 계정 팀에 문의하세요.
If you are not sure which version to use, see 커넥터 버전 선택하기. Configuring the Kafka connector ==============================================================================
커넥터의 구성은 공급업체에 따라 다릅니다. Amazon MSK Connect와 같은 일부 구현에는 커넥터를 구성하기 위한 UI가 있으며 JSON 형식의 구성 및 속성 파일 형식을 허용합니다.
이 섹션은 커넥터의 매개 변수 이름과 값에 대한 일반 참조입니다. 클라우드 벤더마다 구성 요구 사항이 약간 다를 수 있습니다.
중요
Kafka Connect 프레임워크는 Kafka 커넥터에 대한 구성 설정을 마스터 노드에서 작업자 노드로 브로드캐스트합니다. 구성 설정에는 민감한 정보(특히 Snowflake 사용자 이름 및 개인 키)가 포함됩니다. Kafka Connect 노드 사이의 통신 채널이 보호되어야 함에 유의하십시오. 이와 관련한 지침은 Apache Kafka 소프트웨어 설명서를 참조하십시오.
각 구성 파일은 1개의 데이터베이스와 해당 데이터베이스에서 1개의 스키마에 대한 항목 및 해당 테이블을 지정합니다. 커넥터는 여러 항목에서 메시지를 수집할 수 있지만, 해당 테이블은 모두 단일 데이터베이스 및 스키마에 있어야 합니다.
구성 필드에 대한 설명은 커넥터 구성 속성 을 참조하십시오.
중요
일반적으로 구성 파일에는 개인 키와 같은 보안 관련 정보가 포함되어 있으므로 파일에 대한 읽기/쓰기 권한을 적절하게 설정하여 액세스를 제한해야 합니다.
또한, 구성 파일을 안전한 외부 위치 또는 키 관리 서비스에 저장하는 것을 고려하십시오.
구성 json 파일 예제
{
"name":"XYZCompanySensorData",
"config":{
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"tasks.max": "1",
"snowflake.topic2table.map": "topic1:table_1,topic2:table_2",
"snowflake.url.name": "myorganization-myaccount.snowflakecomputing.com:443",
"snowflake.warehouse.name": "WH",
"snowflake.private.key": "-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n",
"snowflake.schema.name": "MY_SCHEMA",
"snowflake.database.name": "MY_DATABASE",
"snowflake.role.name": "MY_ROLE",
"snowflake.user.name": "MY_USER",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"errors.log.enable": "true",
"topics": "topic1,topic2",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all"
}
}
구성 속성 파일 예제
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
tasks.max=1
snowflake.topic2table.map=topic1:table_1,topic2:table_2
snowflake.url.name=myorganization-myaccount.snowflakecomputing.com:443
snowflake.warehouse.name=WH
snowflake.private.key=-----BEGIN PRIVATE KEY-----\n .... \n-----END PRIVATE KEY-----\n
snowflake.schema.name=MY_SCHEMA
snowflake.database.name=MY_DATABASE
snowflake.role.name=MY_ROLE
snowflake.user.name=MY_USER
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
topics=topic1,topic2
name=XYZCompanySensorData
value.converter.schemas.enable=false
errors.tolerance=all
커넥터 구성 속성¶
필수 속성¶
name애플리케이션 이름입니다. 이 이름은 고객이 사용하는 모든 Kafka 커넥터에서 고유해야 합니다. 이 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 식별자에 대한 내용은 식별자 요구 사항 을 참조하십시오.
connector.classcom.snowflake.kafka.connector.SnowflakeStreamingSinkConnectortopics항목의 쉼표로 구분된 목록입니다. 기본적으로 Snowflake는 테이블 이름과 동일한 항목 이름을 가정합니다. 테이블 이름이 항목 이름과 동일하지 않은 경우 선택적
topic2table.map매개 변수(아래)를 사용하여 항목 이름에서 테이블 이름으로 매핑을 지정합니다. 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 테이블 이름에 대한 내용은 식별자 요구 사항 을 참조하십시오.참고
둘 다가 아닌
topics또는topics.regex중 1개 가 필수입니다.topics.regexSnowflake 테이블에 로드할 메시지가 포함된 항목을 지정하는 정규식(“regex”)입니다. 커넥터는 정규식과 일치하는 모든 항목 이름에서 데이터를 로드합니다. 정규식은 반드시 Java 정규식의 규칙을 따라야 합니다(즉, java.util.regex.Pattern과 호환되어야 함). 구성 파일에는
topics또는topics.regex중 1개 가 포함되어야 하며 둘 다 포함되지 않아야 합니다.snowflake.url.nameSnowflake 계정에 액세스하기 위한 URL입니다. 이 URL에는 반드시 계정 식별자 가 포함되어야 합니다. 프로토콜(
https://) 및 포트 번호는 선택 사항입니다.snowflake.user.nameSnowflake 계정의 사용자 로그인 이름입니다.
snowflake.role.name커넥터가 테이블에 데이터를 삽입하는 데 사용할 역할의 이름입니다.
snowflake.private.key사용자 인증에서 사용되는 개인 키입니다. 키만 포함되어야 하며 헤더 또는 푸터는 포함되지 않아야 합니다. 키가 여러 라인으로 분할되는 경우에는 줄 바꿈을 제거합니다. 암호화되지 않은 키를 제공하거나 암호화된 키를 제공하고
snowflake.private.key.passphrase매개 변수를 제공하여 Snowflake가 키의 암호를 해독할 수 있도록 할 수 있습니다. :emph:` 매개 변수 값이 암호화된경우에만``snowflake.private.key`이 매개 변수를 사용하십시오. :doc:`/user-guide/key-pair-auth`의 지침에 따라 암호화된 개인 키의 암호를 해독합니다.참고
label-optional_properties`의 :samp:`snowflake.private.key.passphrase 섹션을 참조하세요.
snowflake.database.name행을 삽입할 테이블이 포함된 데이터베이스의 이름입니다.
snowflake.schema.name행을 삽입할 테이블이 포함된 스키마의 이름입니다.
header.converter레코드가 Avro로 형식이 지정되고 헤더가 포함된 경우에만 필요합니다. 값은
"org.apache.kafka.connect.storage.StringConverter"입니다.key.converterKafka 레코드의 키 변환기(예:
"org.apache.kafka.connect.storage.StringConverter")입니다. 이 속성은 Kafka 커넥터에서 사용되지 않지만 Kafka Connect 플랫폼에서는 필요합니다.현재 제한 사항은 Kafka 커넥터 제한 사항 을 참조하십시오.
value.converter커넥터는 표준 Kafka 커뮤니티 변환기를 지원합니다. 데이터 형식에 따라 적절한 변환기를 선택합니다.
JSON레코드의 경우:
"org.apache.kafka.connect.json.JsonConverter"Schema Registry가 있는 Avro 레코드의 경우:
"io.confluent.connect.avro.AvroConverter"
현재 제한 사항은 Kafka 커넥터 제한 사항 을 참조하십시오.
선택적 속성¶
snowflake.private.key.passphrase이 매개 변수의 값이 비어 있지 않은 경우 커넥터는 이 구문을 사용하여 개인 키의 암호를 해독하려고 시도합니다.
tasks.max작업 수이며, 일반적으로 Kafka Connect 클러스터의 작업자 노드에 있는 CPU 코어 수와 동일합니다. 최상의 성능을 얻으려면 작업 수를 Kafka 파티션의 총 수와 동일하게 설정하되 CPU 코어 수를 초과하지 않도록 설정하는 것이 좋습니다. 작업 수가 많으면 메모리 사용량이 증가하고 재밸런싱이 자주 발생할 수 있습니다.
snowflake.topic2table.map이 선택적 매개 변수를 사용하면 사용자가 테이블에 매핑할 항목을 지정할 수 있습니다. 각 항목과 테이블 이름은 콜론으로 구분해야 합니다(아래 예 참조). 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 유효한 테이블 이름에 대한 내용은 식별자 요구 사항 을 참조하십시오. 항목 구성에서는
topics.regex를 사용할 때와 마찬가지로 정규식을 사용하여 항목을 정의할 수 있습니다. 정규 식은 모호할 수 없습니다. 일치하는 항목은 단 하나의 대상 테이블과만 일치해야 합니다.예:
topics="topic1,topic2,topic5,topic6" snowflake.topic2table.map="topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range"
다음과 같이 작성할 수 있습니다:
topics.regex="topic[0-9]" snowflake.topic2table.map="topic[0-4]:low_range,topic[5-9]:high_range"
value.converter.schema.registry.url형식이 Avro이고 Schema Registry Service를 사용하는 경우 이 속성은 Schema Registry Service의 URL이어야 합니다. 그렇지 않으면 이 필드를 비워둬야 합니다.
value.converter.break.on.schema.registry.error스키마 레지스트리 서비스에서 Avro 데이터를 로드하는 경우 이 속성은 스키마 ID를 가져오는 동안 오류가 발생하면 Kafka 커넥터가 레코드 사용을 중지해야 하는지 여부를 결정합니다. 기본값은
false입니다. 이 동작을 활성화하려면 값을true로 설정합니다.jvm.proxy.hostSnowflake Kafka Connector가 프록시 서버를 통해 Snowflake에 액세스할 수 있도록 하려면 이 매개 변수를 설정하여 해당 프록시 서버의 호스트를 지정합니다.
jvm.proxy.portSnowflake Kafka Connector가 프록시 서버를 통해 Snowflake에 액세스할 수 있도록 하려면 이 매개 변수를 설정하여 해당 프록시 서버의 포트를 지정합니다.
snowflake.streaming.max.client.lagSnowflake Ingest Java 가 데이터를 Snowflake로 플러시하는 빈도(초)를 지정합니다.
- 값:
최소:
1초최대:
600초
- 기본값:
:code:`1`초
jvm.proxy.username프록시 서버로 인증하는 사용자 이름입니다.
jvm.proxy.password프록시 서버로 인증하는 사용자 이름의 비밀번호입니다.
snowflake.jdbc.map예:
"snowflake.jdbc.map": "networkTimeout:20,tracing:WARNING"추가 JDBC 속성(JDBC 드라이버 연결 매개 변수 참조 참조)은 유효성이 검사되지 않습니다. 이러한 추가 속성은 유효성이 검사되지 않으며 필수 속성(예:
jvm.proxy.xxx,snowflake.user.name,snowflake.private.key,snowflake.schema.name) 대신 재정의하거나 사용해서는 안 됩니다.- 다음 조합 중 하나를 지정합니다.
JDBC_TRACE환경 변수와 함께tracing속성snowflake.database.name와 함께database속성
모호한 동작이 발생하며 동작은 JDBC 드라이버에 의해 결정됩니다.
value.converter.basic.auth.credentials.sourceAvro 데이터 타입을 사용 중이고 Kafka 스키마 레지스트리에 대한 보안 액세스가 필요한 경우 이 매개 변수를 “USER_INFO” 문자열로 설정하고 아래 설명된
value.converter.basic.auth.user.info매개 변수를 설정합니다. 그렇지 않으면, 이 매개 변수를 생략합니다.value.converter.basic.auth.user.infoAvro 데이터 타입을 사용 중이고 Kafka 스키마 레지스트리에 대한 보안 액세스가 필요한 경우 위의 설명과 같이 이 매개 변수를 “<사용자_ID>:<비밀번호>” 문자열로 설정하고 value.converter.basic.auth.credentials.source 매개 변수를 설정합니다. 그렇지 않으면, 이 매개 변수를 생략합니다.
snowflake.metadata.createtime값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서
CreateTime속성 값이 생략됩니다. 기본값은 TRUE입니다.snowflake.metadata.topic값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서
topic속성 값이 생략됩니다. 기본값은 TRUE입니다.snowflake.metadata.offset.and.partition값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터에서
Offset및Partition속성 값이 생략됩니다. 기본값은 TRUE입니다.snowflake.metadata.all값이 FALSE로 설정되면 RECORD_METADATA 열의 메타데이터가 완전히 비어 있습니다. 기본값은 TRUE입니다.
transformsKafka 커넥터가 발견한 삭제 표시 레코드를 건너뛰고 대상 테이블에 로드하지 않도록 지정합니다. 삭제 표시 레코드는 전체 값 필드가 null인 레코드로 정의됩니다.
속성 값을
"tombstoneHandlerExample"로 설정합니다.참고
Kafka 커뮤니티 변환기(즉,
value.converter속성 값)에만 이 속성(예:org.apache.kafka.connect.json.JsonConverter또는org.apache.kafka.connect.json.AvroConverter)을 사용합니다. Snowflake 변환기로 삭제 표시 레코드 처리를 관리하려면behavior.on.null.values속성을 대신 사용합니다.transforms.tombstoneHandlerExample.typetransforms속성을 설정할 때 필수입니다.속성 값을
"io.confluent.connect.transforms.TombstoneHandler"로 설정합니다.behavior.on.null.valuesKafka 커넥터가 삭제 표시 레코드를 처리하는 방법을 지정합니다. 삭제 표시 레코드는 전체 값 필드가 null인 레코드로 정의됩니다. Snowpipe 의 경우 이 속성은 Kafka 커넥터 버전 1.5.5 이상에서 지원됩니다. Snowpipe Streaming 의 경우 이 속성은 Kafka 커넥터 버전 2.1.0 이상에서 지원됩니다.
이 속성에서 지원되는 값은 다음과 같습니다.
DEFAULTKafka 커넥터에서 삭제 표시 레코드가 발생하면 내용 열에 빈 JSON 문자열이 삽입됩니다.
IGNOREKafka 커넥터가 삭제 표시 레코드를 건너뛰고 해당 레코드에 대한 행을 삽입하지 않습니다.
기본값은
DEFAULT입니다.참고
삭제 표시 레코드 수집은 수집 방법에 따라 다릅니다.
Snowpipe의 경우 Kafka 커넥터는 Snowflake 변환기만 사용합니다. Kafka 커뮤니티 변환기를 사용하여 삭제 표시 레코드 처리를 관리하려면
transform및transforms.tombstoneHandlerExample.type속성을 대신 사용해야 합니다.Snowpipe Streaming의 경우 Kafka 커넥터는 커뮤니티 변환기만 사용합니다.
Kafka 브로커에 전송된 레코드는 Kafka 커넥터에 의해 삭제되어 오프셋이 누락되므로 NULL이면 안 됩니다. 오프셋이 누락되면 특정 사용 사례에서 Kafka 커넥터가 손상됩니다. NULL 레코드 대신 삭제 표식 레코드를 사용하는 것이 좋습니다.