Apache Iceberg™ 테이블과 함께 Kafka용 Snowflake Connector 사용하기

버전 3.0.0부터, Kafka용 Snowflake Connector는 Snowflake가 관리하는 Apache Iceberg™ 테이블 로 데이터를 수집할 수 있습니다.

요구 사항 및 제한 사항

Iceberg 테이블 수집을 위한 Kafka 커넥터를 구성하기 전에 다음 요구 사항과 제한 사항에 유의하십시오.

  • Iceberg 테이블 수집을 사용하려면 Kafka 커넥터 버전 3.0.0 이상이 필요합니다.

  • Iceberg 테이블 수집은 Snowpipe Streaming이 포함된 Kafka 커넥터에서 지원됩니다. Snowpipe가 포함된 Kafka 커넥터에서는 지원되지 않습니다.

  • snowflake.streaming.enable.single.bufferfalse 로 설정된 경우 Iceberg 테이블 수집은 지원되지 않습니다.

  • 커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다. 자세한 내용은 이 항목의 구성 및 설정 섹션을 참조하십시오.

스키마 진화 제한 사항

Iceberg의 스키마 진화는 AVRO 또는 Protobuf와 같은 도식화된 데이터 형식에 대해 완벽하게 지원됩니다.

스키마가 없는 일반 JSON 의 경우 커넥터는 다음 메시지 유형을 유효하지 않은 것으로 간주하여 데드 레터 큐(DLQ)로 보냅니다.

  • 해당 값이 null 또는 [] 인 경우 새 열이 있는 메시지입니다.

  • 해당 값이 null 또는 [] 인 경우 정형 오브젝트에 새 필드가 있는 메시지입니다

커넥터가 이러한 메시지 유형을 수집할 수 있도록 테이블 스키마를 수동으로 변경하려면 ALTER TABLE 문을 사용합니다.

구성 및 설정

Iceberg 테이블 수집을 위한 Kafka 커넥터를 설정하려면, Snowpipe Streaming 기반 커넥터의 일반적인 설정 단계 를 따르되, 다음 섹션에서 설명되는 몇 가지 차이점을 확인해야 합니다.

외부 볼륨에 사용 권한 부여

Iceberg 테이블과 연결된 외부 볼륨에 대한 USAGE 권한을 Kafka 커넥터의 역할에 부여해야 합니다.

예를 들어, Iceberg 테이블이 kafka_external_volume 외부 볼륨을 사용하고 커넥터가 kafka_connector_role 역할을 사용하는 경우 다음 문을 실행합니다.

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

수집을 위한 Iceberg 테이블 만들기

커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다. 초기 테이블 스키마는 커넥터 snowflake.enable.schematization 설정에 따라 달라집니다.

스키마화를 활성화하면 record_metadata 라는 열이 있는 테이블을 만들 수 있습니다.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

커넥터는 record_content 열을 자동으로 생성하고 record_metadata 열 스키마를 변경합니다.

스키마화를 활성화하지 않은 경우 실제 Kafka 메시지 내용과 일치하는 유형의 record_content 열이 있는 테이블을 만들 수 있습니다. 커넥터는 record_metadata 열을 자동으로 생성합니다.

Iceberg 테이블을 생성할 때 Iceberg 데이터 타입 또는 호환되는 Snowflake 타입 을 사용할 수 있습니다. 반정형 VARIANT 유형은 지원되지 않습니다. 대신, 정형 OBJECT 또는 MAP 을 사용하십시오.

예를 들어, 다음 메시지를 생각해 보십시오.

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

예제 메시지에 대한 Iceberg 테이블을 생성하려면 다음 문을 사용합니다.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_content OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

참고

dogs 또는 cats 같이 중첩된 구조 내의 필드 이름은 대소문자를 구분합니다.

구성 속성

snowflake.streaming.iceberg.enabled

커넥터가 데이터를 Iceberg 테이블로 수집할지 여부를 지정합니다. 이 속성이 실제 테이블 유형과 일치하지 않으면 커넥터가 실패합니다.

:

  • true

  • false

기본값:

false