Kafka Connector의 개요

이 항목에서는 Apache Kafka 및 Kafka용 Snowflake 커넥터에 대한 개요를 제공합니다.

참고

Kafka 커넥터에는 서드 파티 약관 이 적용됩니다.

이 항목의 내용:

Apache Kafka 소개

Apache Kafka 소프트웨어는 게시 및 구독 모델을 사용하여 메시지 큐 또는 엔터프라이즈 메시징 시스템과 유사한 레코드 스트림을 쓰고 읽습니다. Kafka를 사용하면 프로세스가 비동기적으로 메시지를 읽고 쓸 수 있습니다. 구독자는 게시자와 직접 연결할 필요가 없으며, 게시자는 구독자가 나중에 수신할 수 있도록 Kafka의 메시지를 큐에 추가할 수 있습니다.

애플리케이션은 항목 에 메시지를 게시하며 애플리케이션은 해당 항목을 구독하여 그러한 메시지를 수신합니다. Kafka는 메시지를 처리하고 전송할 수 있지만, 이와 관련한 설명은 이 문서의 범위를 벗어납니다. 확장성을 향상하기 위해 항목을 파티션 으로 나눌 수 있습니다.

Kafka Connect는 데이터베이스 등 외부 시스템과 Kafka를 연결하기 위한 프레임워크입니다. Kafka Connect 클러스터는 Kafka 클러스터와 별도인 클러스터입니다. Kafka Connect 클러스터는 커넥터(외부 시스템 사이에서 읽기 및/또는 쓰기를 지원하는 구성 요소)의 실행 및 확장을 지원합니다.

Kafka 커넥터는 Kafka Connect 클러스터에서 실행되어 Kafka 항목에서 데이터를 읽고 Snowflake 테이블에 쓰도록 설계되었습니다.

Snowflake는 두 가지 커넥터 버전을 제공합니다.

Snowflake의 관점에서 Kafka 항목은 Snowflake 테이블에 삽입할 행 스트림을 생성합니다. 일반적으로 1개의 Kafka 메시지에는 행이 1개 포함됩니다.

여러 메시지 게시/구독 플랫폼과 마찬가지로 Kafka는 게시자와 구독자 사이에서 다대다 관계를 허용합니다. 단일 애플리케이션은 여러 항목에 게시할 수 있으며 단일 애플리케이션은 여러 항목을 구독할 수 있습니다. Snowflake에서 일반적인 패턴은 1개의 항목이 1개의 Snowflake 테이블에 대한 메시지(행)를 제공하는 것입니다.

Kafka 커넥터의 현재 버전은 Snowflake에 데이터를 로드하는 것으로 제한됩니다. Kafka 커넥터는 다음 두 가지 데이터 로딩 방법을 지원합니다.

자세한 내용은 Snowflake에 데이터 로드하기Snowpipe Streaming과 함께 Kafka용 Snowflake 커넥터 사용하기 를 참조하십시오.

Kafka 항목을 위한 대상 테이블

Kafka 항목은 Kafka 구성의 기존 Snowflake 테이블에 매핑할 수 있습니다. 항목이 매핑되지 않은 경우 Kafka 커넥터는 항목의 이름을 사용하여 각 항목에 대한 새 테이블을 생성합니다.

커넥터는 다음 규칙을 사용하여 항목의 이름을 유효한 Snowflake 테이블 이름으로 변환합니다.

  • 소문자 항목 이름은 대문자 테이블 이름으로 변환됩니다.

  • 항목 이름의 첫 번째 문자가 문자(a-z 또는 A-Z) 또는 밑줄 문자(_)가 아닌 경우 커넥터는 테이블 이름 앞에 밑줄을 추가합니다.

  • 항목 이름의 문자가 Snowflake 테이블 이름에 적합한 문자가 아닌 경우 해당 문자는 밑줄 문자로 대체됩니다. 테이블 이름에 유효한 문자에 대한 자세한 내용은 식별자 요구 사항 을 참조하십시오.

Kafka 커넥터가 Kakfa 항목에 대해 생성된 테이블의 이름을 조정해야 하는 경우 동일한 스키마에 있는 두 테이블의 이름이 동일할 수 있음에 유의하십시오. 예를 들어, numbers+xnumbers-x 항목에서 데이터를 읽는 경우 이러한 항목에 대해 생성된 테이블은 모두 NUMBERS_X 입니다. 테이블 이름이 실수로 중복되는 것을 방지하기 위해 커넥터는 테이블 이름에 접미사를 추가합니다. 밑줄 다음에 생성된 해시 코드가 접미사로 추가됩니다.

Snowflake는 가능하면 Snowflake 식별자 이름 규칙을 따르는 항목의 이름을 선택할 것을 권장합니다.

Kafka 항목을 위한 테이블 스키마

Snowpipe Streaming 을 통해 Kafka 커넥터는 스키마 감지 및 진화 를 선택적으로 지원합니다.

기본적으로, Snowpipe 또는 Snowpipe Streaming 사용 시 Kafka 커넥터에 의해 로드된 모든 Snowflake 테이블에는 다음과 같은 2개의 VARIANT 열로 구성되는 스키마가 있습니다.

  • RECORD_CONTENT. 여기에는 Kafka 메시지가 포함됩니다.

  • RECORD_METADATA. 여기에는 메시지에 대한 메타데이터(예: 메시지를 읽은 항목)가 포함됩니다.

Snowflake가 테이블을 생성하는 경우 테이블에는 이러한 열 2개만 포함됩니다. 사용자가 Kafka Connector가 행을 추가할 테이블을 생성하는 경우 테이블에는 이러한 2개 이상의 열(커넥터의 데이터에는 해당 열에 대한 값이 포함되지 않으므로 추가 열에서는 NULL 값이 허용되어야 함)이 포함될 수 있습니다.

RECORD_CONTENT 열에는 Kafka 메시지가 포함됩니다.

Kafka 메시지에는 전송되는 정보에 따라 다른 내부 구조가 있습니다. 예를 들어, IoT(사물 인터넷) 날씨 센서에서 제공되는 메시지에는 데이터가 기록된 타임스탬프, 센서의 위치, 온도, 습도 등이 포함될 수 있습니다. 재고 시스템의 메시지에는 제품 ID 및 판매된 품목 수가 포함될 수 있으며, 판매 또는 배송된 시간을 나타내는 타임스탬프도 포함될 수 있습니다.

일반적으로 특정 항목의 각 메시지는 기본 구조가 동일합니다. 일반적으로 항목이 다르면 사용되는 구조가 다릅니다.

각 Kafka 메시지는 JSON 형식 또는 Avro 형식으로 Snowflake에 전달됩니다. Kafka 커넥터는 타입이 지정된 정보를 VARIANT 타입의 단일 열에 저장합니다. 데이터의 구문은 분석되지 않으며 데이터는 Snowflake 테이블의 여러 열로 분할되지 않습니다.

RECORD_METADATA 열에 기본적으로 포함되는 정보는 다음과 같습니다.

필드

Java . 데이터 타입

SQL . 데이터 타입

필수

설명

항목

문자열

VARCHAR

레코드가 제공된 Kafka 항목의 이름입니다.

파티션

문자열

VARCHAR

항목 내의 파티션 수입니다. (Snowflake 마이크로 파티션이 아닌 Kafka 파티션이라는 점에 유의.)

오프셋

long

INTEGER

해당 파티션의 오프셋입니다.

CreateTime / . LogAppendTime

long

BIGINT

아니요

Kafka 항목에서 메시지와 연결된 타임스탬프입니다. 값은 UTC 기준 1970년 1월 1일 자정으로부터의 밀리초입니다. 자세한 내용은 https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html을 참조하십시오.

문자열

VARCHAR

아니요

메시지가 Kafka KeyedMessage인 경우 해당 메시지의 키입니다. 커넥터가 RECORD_METADATA에 키를 저장하려면 Kafka 구성 속성 의 key.converter 매개 변수를 《org.apache.kafka.connect.storage.StringConverter》로 설정해야 하며, 그렇지 않으면 커넥터가 키를 무시합니다.

스키마_id

int

INTEGER

아니요

스키마 레지스트리와 함께 Avro를 사용하여 스키마를 지정하는 경우 해당 레지스트리에 있는 스키마의 ID입니다.

헤더

오브젝트

OBJECT

아니요

헤더는 레코드와 연결된 사용자 정의 키-값 페어입니다. 각 레코드에는 0개, 1개 또는 여러 개의 헤더가 있을 수 있습니다.

RECORD_METADATA 열에 기록된 메타데이터의 양은 선택적 Kafka 구성 속성을 사용하여 구성할 수 있습니다. 자세한 내용은 Kafka Connector 설치 및 구성하기 섹션을 참조하십시오.

필드 이름 및 값에서는 대/소문자를 구분합니다.

JSON 구문으로 표현되는 경우의 샘플 메시지는 다음과 유사할 수 있습니다.

{
    "meta":
    {
        "offset": 1,
        "topic": "PressureOverloadWarning",
        "partition": 12,
        "key": "key name",
        "schema_id": 123,
        "CreateTime": 1234567890,
        "headers":
        {
            "name1": "value1",
            "name2": "value2"
        }
    },
    "content":
    {
        "ID": 62,
        "PSI": 451,
        "etc": "..."
    }
}
Copy

VARIANT 열을 쿼리하기 위한 적정한 구문 쿼리에 적절한 구문 을 사용하여 Snowflake 테이블을 직접 쿼리할 수 있습니다.

RECORD_METADATA에서 항목을 기반으로 데이터를 추출하는 단순한 예시는 다음과 같습니다.

select
       record_metadata:CreateTime,
       record_content:ID
    from table1
    where record_metadata:topic = 'PressureOverloadWarning';
Copy

출력은 다음과 유사합니다.

+------------+-----+
| CREATETIME | ID  |
+------------+-----+
| 1234567890 | 62  |
+------------+-----+
Copy

아니면 이러한 테이블에서 데이터를 추출하고 데이터를 개별 열로 스큐한 후 일반적으로 쿼리하기 더 쉬운 다른 테이블에 데이터를 저장할 수 있습니다.

Kafka Connector용 워크플로

Kafka 커넥터는 다음 프로세스를 완료하여 Kafka 항목을 구독하고 Snowflake 오브젝트를 생성합니다.

  1. Kafka 커넥터는 Kafka 구성 파일 또는 명령줄(또는 Confluent의 경우 Confluent Control Center)을 통해 제공되는 구성 정보에 따라 1개 이상의 Kafka 항목을 구독합니다.

  2. 커넥터는 각 항목과 관련하여 다음 오브젝트를 생성합니다.

    • 각 항목에 대한 데이터 파일을 임시로 저장하는 내부 스테이지 1개.

    • 각 항목 파티션에 대한 데이터 파일을 수집하는 파이프 1개.

    • 각 항목에 대한 테이블 1개. 각 항목에 지정된 테이블이 없는 경우 커넥터가 해당 테이블을 생성합니다. 그렇지 않으면 커넥터가 기존 테이블에 RECORD_CONTENT 및 RECORD_METADATA 열을 생성하고 다른 열에서 null이 허용되는지 확인(그렇지 않으면 오류가 발생함)합니다.

다음 다이어그램은 Kafka 커넥터가 포함된 Kafka의 수집 흐름을 보여줍니다.

Kafka flow using the Kafka connector
  1. 1개 이상의 애플리케이션이 JSON 또는 Avro 레코드를 Kafka 클러스터에 게시합니다. 레코드는 1개 이상의 항목 파티션으로 분할됩니다.

  2. Kafka 커넥터는 Kafka 항목의 메시지를 버퍼에 추가합니다. 임계값(시간 또는 메모리 또는 메시지 수)에 도달하면 커넥터는 내부 스테이지의 임시 파일에 메시지를 씁니다. 커넥터는 Snowpipe 를 트리거하여 임시 파일을 수세트니다. Snowpipe는 데이터 파일에 대한 포인터를 큐에 복사합니다.

  3. Snowflake가 제공하는 가상 웨어하우스는 Kafka 항목 파티션에 대해 생성된 파이프를 통해 스테이징된 파일에서 대상 테이블(즉, 항목에 대한 구성 파일에 지정된 테이블)로 데이터를 로드합니다.

  4. (표시되지 않음) 커넥터는 Snowpipe를 모니터링하고, 파일 데이터가 테이블에 로드되었음을 확인한 후 내부 스테이지에서 각 파일을 삭제합니다.

    오류로 인해 데이터가 로드되지 않으면 커넥터는 파일을 테이블 스테이지로 이동하고 오류 메시지를 생성합니다.

  5. 커넥터에서 2~4단계가 반복됩니다.

주의

Snowflake는 한 시간 동안 insertReport API 를 폴링합니다. 수집된 파일의 상태가 이 시간 내에 성공하지 못하면 수집 중인 파일이 테이블 스테이지로 이동됩니다.

테이블 스테이지에서 이러한 파일을 사용할 수 있으려면 1시간 이상 걸릴 수 있습니다. 파일은 이전 시간 내에 수집 상태를 찾을 수 없을 때만 테이블 스테이지로 이동됩니다.

장애 허용 오차

Kafka 및 Kafka 커넥터 모두에는 내결함성이 있습니다. 메시지는 복제되거나 자동으로 삭제되지 않습니다.

데이터 로딩 체인에서 Snowpipe 워크플로의 데이터 중복 제거 논리는 매우 드문 경우를 제외하고 반복 데이터의 중복 복사본을 제거합니다. Snowpipe가 레코드를 로드하는 동안 오류가 감지되면(예: 잘못된 레코드 형식의 JSON 또는 Avro) 레코드가 로드되지 않으며, 대신 레코드가 테이블 스테이지로 이동됩니다.

Snowpipe Streaming을 사용하는 Kafka 커넥터는 오류 처리를 위해 DLQ(배달 못한 편지 큐)를 지원합니다. 자세한 내용은 Snowpipe Streaming을 사용하는 Kafka 커넥터의 오류 처리 및 DLQ 속성 을 참조하십시오.

커넥터의 내결함성 제한 사항

Kafka 항목은 저장 공간 또는 보존 시간에 대한 제한으로 구성될 수 있습니다.

  • 기본 보존 시간은 7일입니다. 시스템이 보존 시간 이상 동안 오프라인 상태로 유지되면 만료된 레코드는 로드되지 않습니다. 유사하게 Kafka의 저장소 공간 제한이 초과되면 일부 메시지가 전달되지 않습니다.

  • Kafka 항목의 메시지가 삭제 또는 업데이트되는 경우 해당 변경 사항이 Snowflake 테이블에 반영되지 않을 수 있습니다.

주의

Kafka 커넥터의 인스턴스는 서로 통신하지 않습니다. 동일한 항목 또는 파티션에서 커넥터의 여러 인스턴스를 시작하면 동일한 행의 여러 복사본이 테이블에 삽입될 수 있습니다. 이는 권장되지 않으며, 각 항목은 커넥터의 한 인스턴스에서만 처리되어야 합니다.

이론적으로는 Snowflake의 수집 속도보다 더 빠르게 Kafka에서 메시지가 이동할 수 있습니다. 그러나 실제로는 가능하지 않습니다. 이러한 상황이 발생하는 경우에는 Kafka Connect 클러스터의 성능을 조정하여 문제를 해결할 수 있습니다. 예:

  • Connect 클러스터의 노드 수 조정.

  • 커넥터에 할당된 작업 수 조정.

  • 커넥터와 Snowflake 배포 사이에서 네트워크 대역폭의 영향 이해.

중요

행은 원래 게시된 순서대로 삽입되지 않을 수 있습니다.

지원되는 플랫폼

Kafka 커넥터는 모든 Kafka Connect 클러스터에서 실행할 수 있으며 지원되는 클라우드 플랫폼 의 Snowflake 계정으로 데이터를 전송할 수 있습니다.

프로토콜 버퍼 데이터 지원

Kafka 커넥터 1.5.0 이상은 protobuf(프로토콜 버퍼) 변환기를 통해 protobuf를 지원합니다. 자세한 내용은 Kafka용 Snowflake 커넥터를 사용하여 Protobuf 데이터 로드하기 섹션을 참조하십시오.

청구 정보

Kafka 커넥터 사용과 관련한 직접 요금은 없습니다. 그러나 다음과 같은 간접 요금이 있습니다.

  • 커넥터가 Kafka에서 읽는 데이터를 로드하기 위한 용도로 Snowpipe가 사용되며 Snowpipe 처리 시간에 대한 요금이 계정에 청구됩니다.

  • 데이터 저장소 요금은 계정에 청구됩니다.

Kafka Connector 제한 사항

SMTs(Single Message Transformation)는 메시지가 Kafka Connect를 통해 이동할 때 적용됩니다. Kafka 구성 속성 을 구성할 때 key.converter 또는 value.converter하나 를 다음 값 중 하나로 설정하면 해당 키 또는 값에서 SMTs가 지원되지 않습니다.

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

key.converter 또는 value.converter 가 설정되지 않은 경우에는 현재 regex.router 를 제외한 대부분의 SMTs가 지원됩니다.

Snowflake 변환기는 SMTs를 지원하지 않지만, Kafka 커넥터 버전 1.4.3 이상에서는 다음과 같은 다양한 커뮤니티 기반 변환기를 지원합니다.

  • io.confluent.connect.avro.AvroConverter

  • org.apache.kafka.connect.json.JsonConverter

SMTs에 대한 자세한 내용은 https://docs.confluent.io/current/connect/transforms/index.html을 참조하십시오.