DLQ 및 메타데이터가 포함된 Apache Kafka

참고

커넥터에는 커넥터 약관 이 적용됩니다.

이 항목에서는 DLQ 및 메타데이터 커넥터가 포함된 Apache Kafka에 대해 설명합니다. 이 커넥터는 모든 기능을 갖춘 커넥터로, 기존 Kafka용 Snowflake 커넥터와 기능 동등성을 제공하며 프로덕션 사용 사례를 위한 고급 기능을 포함하고 있습니다.

주요 기능

DLQ 및 메타데이터 커넥터가 포함된 Apache Kafka는 포괄적인 기능을 제공합니다.

  • 데드 레터 큐(DLQ) 메시지 처리 실패 지원

  • RECORD_METADATA 열에 Kafka 메시지 메타데이터를 추가합니다

  • 구성 가능한 스키마화 - 스키마 탐지 활성화 또는 비활성화

  • 스키마 진화를 통한 Iceberg 테이블 지원

  • 여러 메시지 형식 - JSON 및 AVRO 지원

  • AVRO 메시지에 대한 Schema Registry 통합

  • 고급 패턴을 사용한 항목-테이블 매핑

  • SASL 인증 지원

특정 매개 변수

Kafka용 Openflow Connector 설정하기 에 설명된 일반적인 매개 변수 외에도 이 커넥터에는 고급 기능을 위한 추가 매개 변수 컨텍스트가 포함되어 있습니다.

메시지 형식 및 스키마 매개 변수

매개 변수

설명

필수

메시지 형식

Kafka의 메시지 형식입니다. JSON / AVRO 중 하나. 기본값: JSON

AVRO 스키마

AVRO 메시지 형식의 AVRO 스키마 액세스 전략에서 schema-text-property를 사용하는 경우의 Avro 스키마. 참고: 구성된 Kafka 항목에서 소비되는 모든 메시지가 동일한 스키마를 공유하는 경우에만 이 옵션을 사용해야 합니다.

아니요

AVRO Schema Access Strategy

메시지의 AVRO 스키마에 액세스하는 방법입니다. AVRO 에 필수. 다음 중 하나: embedded-avro-schema / schema-reference-reader / schema-text-property. 기본값: embedded-avro-schema

아니요

Schema Registry 매개 변수

매개 변수

설명

필수

Schema Registry 인증 유형

Schema Registry를 사용하는 경우 Schema Registry에 인증하는 방법입니다. 그렇지 않으면 NONE 을 사용하십시오. NONE / BASIC 중 하나. 기본값: NONE

Schema Registry URL

Schema Registry의 URL 입니다. AVRO 메시지 형식에 필수.

아니요

Schema Registry 사용자 이름

Schema Registry의 사용자명입니다. AVRO 메시지 형식에 필수.

아니요

Schema Registry 비밀번호

Schema Registry의 비밀번호입니다. AVRO 메시지 형식에 필수.

아니요

DLQ 및 고급 기능 매개 변수

매개 변수

설명

필수

Kafka DLQ 항목

구문 분석 오류가 있는 메시지를 보낼 DLQ 항목

도식화 사용

데이터를 개별 열에 삽입할지, 아니면 단일 RECORD_CONTENT 필드에 삽입할지 결정합니다. 다음 중 하나: true / false. 기본값: true

Iceberg 활성화됨

프로세서가 데이터를 Iceberg 테이블로 수집할지 여부를 지정합니다. 이 속성이 실제 테이블 유형과 일치하지 않으면 프로세서가 실패합니다. 기본값: false

도식화 동작

커넥터의 동작은 Schematization Enabled 매개 변수에 따라 변경됩니다.

도식화 사용

도식화가 활성화되면 커넥터는:

  • 메시지의 각 필드에 대해 개별 열을 생성합니다

  • Kafka 메타데이터가 포함된 RECORD_METADATA 열 포함

  • 새 필드가 감지되면 자동으로 테이블 스키마 진화

  • 중첩된 JSON/AVRO 구조를 별도의 열로 데이터 스큐합니다

테이블 구조 예시:

RECORD_METADATA

ACCOUNT

SYMBOL

SIDE

QUANTITY

1

{“timestamp”:1669074170090, “headers”: {“current.iter…

ABC123

ZTEST

BUY

3572

2

{“timestamp”:1669074170400, “headers”: {“current.iter…

XYZ789

ZABX

SELL

3024

도식화 비활성화

도식화가 비활성화되면 커넥터는:

  • RECORD_CONTENTRECORD_METADATA 의 2개 열만 생성합니다.

  • 전체 메시지 내용을 RECORD_CONTENT 의 OBJECT 로 저장합니다

  • 자동 스키마 진화를 수행하지 않습니다

  • 다운스트림 처리를 위한 최대의 유연성 제공

테이블 구조 예시:

RECORD_METADATA

RECORD_CONTENT

1

{“timestamp”:1669074170090, “headers”: {“current.iter…

{“account”: “ABC123”, “symbol”: “ZTEST”, “side”:…

2

{“timestamp”:1669074170400, “headers”: {“current.iter…

{“account”: “XYZ789”, “symbol”: “ZABX”, “side”:…

커넥터 구성 속성에서 Schematization Enabled 속성을 사용하여 스키마 감지를 사용하거나 사용하지 않도록 설정합니다.

스키마 탐지 및 진화

커넥터는 스키마 감지 및 진화를 지원합니다. 커넥터가 로딩하는 새로운 데이터의 구조를 지원하도록 Snowflake의 테이블 구조를 자동으로 정의하고 발전시킬 수 있습니다.

스키마 감지 및 진화 기능이 없으면 커넥터가 로딩한 Snowflake 테이블은 RECORD_CONTENTRECORD_METADATA 의 2개 OBJECT 열로만 구성됩니다.

스키마 감지 및 진화가 활성화되면 Snowflake가 스트리밍 데이터의 스키마를 감지하고 사용자 정의 스키마와 자동으로 일치하는 테이블에 데이터를 로드할 수 있습니다. 또한 Snowflake를 사용해 새로운 열을 추가하거나 새 데이터 파일에 누락된 열에서 NOT NULL 제약 조건을 삭제할 수도 있습니다.

커넥터를 통한 스키마 감지는 제공된 Schema Registry의 유무에 관계없이 지원됩니다. Schema Registry(Avro)를 사용하는 경우 제공된 Schema Registry에 정의된 데이터 타입으로 열이 생성됩니다. 스키마 레지스트리(JSON)가 없으면 제공된 데이터를 기반으로 데이터 타입이 유추됩니다.

추가적인 스키마화에는 JSON ARRAY가 지원되지 않습니다.

스키마 진화 활성화

커넥터가 대상 테이블을 생성하면 스키마 진화는 기본적으로 활성화됩니다.

기존 테이블에서 스키마 진화를 활성화 또는 비활성화하려면 ALTER TABLE 명령을 사용하여 ENABLE_SCHEMA_EVOLUTION 매개 변수를 설정합니다. 또한 테이블에 대한 OWNERSHIP 권한이 있는 역할을 사용해야 합니다. 자세한 내용은 테이블 스키마 진화 섹션을 참조하십시오.

그러나 기존 테이블에 대해 스키마 진화가 비활성화되어 있으면 커넥터는 스키마가 일치하지 않는 행을 구성된 데드 레터 큐(DLQ)로 보내려고 시도합니다.

RECORD_METADATA 구조

RECORD_METADATA 열에는 중요한 Kafka 메시지 메타데이터가 포함되어 있습니다.

필드

설명

오프셋

Kafka 파티션 내의 메시지 오프셋

항목

Kafka 항목 이름

파티션

Kafka 파티션 번호

메시지 키(있는 경우)

타임스탬프

메시지 타임스탬프

SnowflakeConnectorPushTime

커넥터가 Kafka에서 메시지를 가져온 타임스탬프

헤더

메시지 헤더 매핑(있는 경우)

데드 레터 큐(DLQ)

DLQ 기능은 성공적으로 처리할 수 없는 메시지를 처리합니다.

DLQ 동작

  • 구문 분석 실패 - 잘못된 JSON/AVRO 형식의 메시지가 DLQ 로 전송됩니다

  • 스키마 불일치 - 스키마 진화가 비활성화되었을 때 예상 스키마와 일치하지 않는 메시지

  • 처리 오류 - 수집 중 기타 처리 실패

Iceberg 테이블 지원

Iceberg 활성화됨true 로 설정된 경우, Kafka용 Openflow Connector는 Snowflake 관리 Apache Iceberg™ 테이블 로 데이터를 수집할 수 있습니다.

요구 사항 및 제한 사항

Iceberg 테이블 수집을 위한 Openflow Kafka 커넥터를 구성하기 전에 다음 요구 사항과 제한 사항에 유의하십시오.

  • 커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다.

  • 사용자가 생성된 테이블에 데이터를 삽입할 수 있는 액세스 권한이 있는지 확인합니다.

구성 및 설정

Iceberg 테이블 수집을 위한 Kafka용 Openflow Connector를 구성하려면 Kafka용 Openflow Connector 설정하기 의 단계를 따르되, 다음 섹션에 언급된 몇 가지 차이점을 참고하십시오.

Iceberg 테이블로 수집 활성화

Iceberg 테이블로 수집을 사용하려면 Iceberg Enabled 매개 변수를 true 로 설정해야 합니다.

수집을 위한 Iceberg 테이블 만들기

커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다. 초기 테이블 스키마는 커넥터 Schematization Enabled 속성 설정에 따라 달라집니다.

스키마화를 활성화하는 경우 열 이름이 record_metadata 인 테이블을 생성해야 합니다.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

커넥터는 메시지 필드에 대한 열을 자동으로 생성하고 record_metadata 열 스키마를 변경합니다.

스키마화를 활성화하지 않은 경우 실제 Kafka 메시지 내용과 일치하는 유형의 record_content 열이 있는 테이블을 생성해야 합니다. 커넥터는 record_metadata 열을 자동으로 생성합니다.

Iceberg 테이블을 생성할 때 Iceberg 데이터 타입 또는 호환되는 Snowflake 타입 을 사용할 수 있습니다. 반정형 VARIANT 유형은 지원되지 않습니다. 대신, 정형 OBJECT 또는 MAP 을 사용하십시오.

예를 들어, 다음 메시지를 생각해 보십시오.

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Iceberg 테이블 생성 예제

도식화가 활성화된 경우:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

도식화가 비활성화된 경우:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    RECORD_METADATA OBJECT(
        offset INTEGER,
        topic STRING,
        partition INTEGER,
        key STRING,
        timestamp TIMESTAMP,
        SnowflakeConnectorPushTime BIGINT,
        headers MAP(VARCHAR, VARCHAR)
    ),
    RECORD_CONTENT OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

참고

RECORD_METADATA 를 항상 생성해야 합니다. dogs 또는 cats 같이 중첩된 구조 내의 필드 이름은 대소문자를 구분합니다.

사용 사례

이 커넥터는 다음에 이상적입니다.

  • 프로덕션 환경 에 DLQ 가 필요한 경우

  • Kafka 메타데이터가 중요한 데이터 계보 및 감사 의 경우

  • 스키마 진화 요구 사항이 있는 복잡한 메시지 처리 의 경우

  • Iceberg 테이블 통합

메타데이터나 DLQ 기능 없이 더 간단한 수집이 필요한 경우, JSON/AVRO 데이터 형식을 위한 Apache Kafka 커넥터를 사용하는 것이 좋습니다.