Snowflake High Performance connector for Kafka 작동 방식

이 항목에서는 커넥터의 다양한 측면, 테이블 및 파이프에서 작동하는 방식, 커넥터를 구성하는 방법에 대해 설명합니다.

커넥터가 테이블 및 파이프에서 작동하는 방식

Kafka용 고성능 Snowflake Connector를 사용하려면 대상 테이블을 수동으로 생성해야 합니다. 커넥터는 각 Kafka 레코드를 Snowflake 테이블에 삽입할 행으로 처리합니다. 예를 들어, 다음과 같이 구조화된 메시지 내용이 있는 Kafka 항목이 있는 경우를 가정해 보겠습니다.

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}
Copy

JSON 키와 일치하는 열이 있는 테이블을 생성하고, 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용하여 레코드 내용의 첫 번째 수준 키를 이름별로 일치하는 테이블 열에 자동으로 매핑할 수 있습니다(대/소문자 구분 안 함).

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  ispaid BOOLEAN
);
Copy

자체 파이프를 생성하기로 선택한 경우 파이프의 COPY INTO 문에서 데이터 변환 논리를 정의할 수 있습니다. 필요에 따라 열의 이름을 변경하고 데이터 타입을 캐스팅할 수 있습니다. 예:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

또는

CREATE TABLE ORDERS (
 topic VARCHAR,
 partition VARCHAR,
 order_id VARCHAR,
 customer_name VARCHAR,
 order_total VARCHAR,
 ispaid VARCHAR
);
Copy
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
  $1:RECORD_METADATA.topic::STRING AS topic,
  $1:RECORD_METADATA.partition::STRING AS partition,
  $1['order_id']::STRING AS order_id,
  $1['customer_name']::STRING as customer_name,
  CONCAT($1['order_total']::STRING, ' USD') AS order_total,
  $1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

자체 파이프를 정의할 때 대상 테이블 열은 JSON 키와 일치할 필요가 없습니다. 열 이름을 원하는 이름으로 변경하고 필요에 따라 데이터 타입을 캐스팅할 수 있습니다.

항목 이름, 테이블 이름, 파이프 이름

구성 설정에 따라 커넥터는 대상 테이블에 대해 다른 이름을 사용합니다. 대상 테이블 이름은 항상 항목 이름에서 파생됩니다.

커넥터가 항목 이름을 대상 테이블에 매핑하는 방법

Kafka 커넥터는 Kafka 항목 이름을 Snowflake 테이블 이름에 매핑하기 위한 두 가지 모드를 제공합니다.

  • 정적 매핑: 커넥터가 Kafka 항목 이름만 사용하여 대상 테이블 이름을 파생합니다.

  • 명시적 항목-테이블 매핑 모드: snowflake.topic2table.map 구성 매개 변수를 사용하여 항목과 테이블 간의 사용자 지정 매핑을 지정합니다.

정적 매핑

snowflake.topic2table.map 매개 변수를 구성하지 않은 경우 커넥터는 항상 항목 이름에서 테이블 이름을 파생합니다.

테이블 이름 생성:

커넥터는 다음 규칙을 사용하여 항목 이름에서 대상 테이블 이름을 파생합니다.

  1. 항목 이름이 유효한 Snowflake 식별자(문자 또는 밑줄로 시작하고 문자, 숫자, 밑줄 또는 달러 기호만 포함)인 경우, 커넥터는 항목 이름을 테이블 이름으로 사용합니다(대문자로 변환됨).

  2. 항목 이름에 유효하지 않은 문자가 포함된 경우 커넥터는 다음을 수행합니다.

    • 유효하지 않은 문자를 밑줄로 대체

    • 고유성을 보장하기 위해 밑줄 뒤에 해시 코드 추가

    • 예를 들어, my-topic.data 항목은 ``MY_TOPIC_DATA_<hash>``가 됩니다.

파이프 이름 결정:

커넥터는 다음 논리에 따라 사용할 파이프를 결정합니다.

  1. 커넥터는 대상 테이블 이름과 동일한 이름의 파이프가 있는지 확인합니다.

  2. 해당 이름의 사용자 생성 파이프가 있는 경우 커넥터는 해당 파이프를 사용합니다(사용자 정의 파이프 모드).

  3. 그렇지 않은 경우 커넥터는 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용합니다.

참고

Snowflake는 예측 가능한 테이블 이름을 보장하기 위해 Snowflake 식별자 이름에 대한 규칙을 따르는 항목 이름을 선택할 것을 권장합니다.

RECORD_METADATA 이해하기

커넥터는 Kafka 레코드에 대한 메타데이터가 있는 RECORD_METADATA 구조를 채웁니다. 이 메타데이터는 Snowpipe Streaming 데이터 소스를 통해 Snowflake로 전송되며, 여기서 $1:RECORD_METADATA 접근자를 사용하는 파이프 변환에 제공됩니다. RECORD_METADATA 구조는 사용자 정의 파이프 모드와 기본 파이프 모드에서 모두 사용할 수 있습니다. 해당 내용은 VARIANT 유형의 열에 저장하거나 개별 필드를 추출하여 별도의 열에 저장할 수 있습니다.

변환 및 메타데이터가 있는 예제 파이프:

CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total,
    $1:RECORD_METADATA.topic AS source_topic,
    $1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
    $1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

이 예제에서:

  • 파이프는 Kafka 메시지에서 특정 필드(order_id, customer_name, order_total)를 추출합니다.

  • 또한 메타데이터 필드(항목, 오프셋 및 수집 타임스탬프)도 캡처합니다.

  • 값은 필요에 따라 캐스팅 및/또는 변환될 수 있습니다.

메타데이터 필드를 채우는 방법

커넥터는 Kafka 레코드 속성 및 커넥터 구성을 기반으로 메타데이터 필드를 자동으로 채웁니다. 다음 구성 매개 변수를 사용하여 포함할 메타데이터 필드를 제어할 수 있습니다.

  • ``snowflake.metadata.topic``(기본값: true) - 항목 이름 포함

  • ``snowflake.metadata.offset.and.partition``(기본값: true) - 오프셋 및 파티션 포함

  • ``snowflake.metadata.createtime``(기본값: true) - Kafka 레코드 타임스탬프 포함

  • ``snowflake.metadata.all``(기본값: true) - 사용 가능한 모든 메타데이터 포함

``snowflake.metadata.all=true``(기본값)인 경우 모든 메타데이터 필드가 채워집니다. 개별 메타데이터 플래그를 ``false``로 설정하면 RECORD_METADATA 구조에서 해당 특정 필드를 제외합니다.

참고

SnowflakeConnectorPushTime 필드는 항상 사용할 수 있으며 커넥터가 레코드를 수집 버퍼로 푸시한 시간을 나타냅니다. 이는 엔드투엔드 수집 대기 시간을 계산하는 데 유용합니다.

RECORD_METADATA 구조에 기본적으로 포함되는 정보는 다음과 같습니다.

필드

데이터 타입

설명

항목

String

레코드가 제공된 Kafka 항목의 이름입니다.

파티션

String

항목 내의 파티션 수입니다. (Snowflake 마이크로 파티션이 아닌 Kafka 파티션이라는 점에 유의.)

오프셋

숫자

해당 파티션의 오프셋입니다.

CreateTime / . LogAppendTime

숫자

Kafka 항목에서 메시지와 연결된 타임스탬프입니다. 값은 UTC 기준 1970년 1월 1일 자정으로부터의 밀리초입니다. 자세한 내용은 https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html을 참조하십시오.

SnowflakeConnectorPushTime

숫자

레코드가 Ingest SDK 버퍼에 푸시된 타임스탬프. 값은 1970년 1월 1일 자정(UTC) 이후의 밀리초 수입니다. 자세한 내용은 수집 지연 예상하기 섹션을 참조하십시오.

String

메시지가 Kafka KeyedMessage인 경우 해당 메시지의 키입니다. 커넥터가 RECORD_METADATA에 키를 저장하려면 Kafka 구성 속성 의 key.converter 매개 변수를 “org.apache.kafka.connect.storage.StringConverter”로 설정해야 하며, 그렇지 않으면 커넥터가 키를 무시합니다.

헤더

오브젝트

헤더는 레코드와 연결된 사용자 정의 키-값 페어입니다. 각 레코드에는 0개, 1개 또는 여러 개의 헤더가 있을 수 있습니다.

RECORD_METADATA 열에 기록된 메타데이터의 양은 선택적 Kafka 구성 속성을 사용하여 구성할 수 있습니다.

필드 이름 및 값에서는 대/소문자를 구분합니다.

수집 전에 Kafka 레코드를 변환하는 방법

각 행이 Snowpipe Streaming으로 전달되기 전에 커넥터는 Kafka Connect 레코드 값을 ``Map<String, Object>``로 변환합니다. 해당 키는 대상 열 이름과 일치해야 합니다(또는 사용자 정의 파이프 내에서 변환될 수 있음). 커넥터가 정형화 오브젝트를 수신할 수 있도록 기본 문자열, 바이트 배열 또는 숫자는 래핑해야 합니다(예: HoistFieldSMT). 변환기는 다음 규칙을 적용합니다.

  • Null 값은 삭제 표시로 처리됩니다. ``behavior.on.null.values=IGNORE``인 경우 건너뛰거나 그렇지 않은 경우 빈 JSON 오브젝트로 수집됩니다.

  • 숫자 및 부울 필드는 있는 그대로 전달됩니다. 전체 자릿수가 38보다 큰 10진수 값은 Snowflake의 NUMBER 제한 내에서 유지되기 위해 문자열로 직렬화됩니다.

  • byte[]ByteBuffer 페이로드는 Base64로 인코딩된 문자열이므로 VARIANT 또는 VARCHAR 열에 저장합니다.

  • 배열은 배열로 유지되고 중첩된 오브젝트는 중첩된 맵으로 유지됩니다. 기본 파이프를 사용하여 중첩된 데이터를 있는 그대로 랜딩하는 경우 VARIANT 열을 선언합니다.

  • Snowflake 열 이름은 텍스트여야 하므로 문자열이 아닌 키가 있는 맵은 [key, value] 페어의 배열로 내보내집니다.

  • 관련 메타데이터 플래그가 활성화될 때마다 레코드 헤더와 키가 ``RECORD_METADATA``에 복사됩니다.

전체 메시지 본문을 단일 열로 보존해야 하는 경우 SMTs를 사용하여 새 최상위 필드로 래핑합니다. 변환 패턴에 대해서는 :ref:`레거시 RECORD_CONTENT 열<label-kafkahp_connector_legacy_record_content_column>`을 참조하세요.

사용자 정의 파이프 모드와 기본 파이프 모드

커넥터는 데이터 수집 관리를 위한 두 가지 모드를 지원합니다.

사용자 정의 파이프 모드

이 모드에서는 데이터 변환 및 열 매핑을 완전히 제어할 수 있습니다.

이 모드를 사용하는 경우:

  • JSON 필드 이름과 다른 사용자 지정 열 이름이 필요한 경우

  • 데이터 변환(형식 캐스팅, 마스킹, 필터링)을 적용해야 하는 경우

  • 데이터가 열에 매핑되는 방식을 완전히 제어하려는 경우

기본 파이프 모드

이 모드에서 커넥터는 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용하며, kafka 레코드 필드를 이름별로 일치하는 테이블 열에 매핑합니다(대/소문자 구분 안 함).

이 모드를 사용하는 경우:

  • kafka 레코드 키 이름이 원하는 열 이름과 일치하는 경우

  • 사용자 지정 데이터 변환이 필요하지 않은 경우

  • 간소화된 구성을 원하는 경우

기본 파이프 모드를 사용하여 kafka 레코드 키를 테이블 열에 매핑하기

기본 파이프 모드를 사용할 때 커넥터는 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용하며, 대/소문자를 구분하지 않는 일치를 통해 콘텐츠의 첫 번째 수준 키를 테이블 열에 직접 매핑합니다.

기본 파이프 모드 사용하기 - 예제

예 1:

다음 kafka 레코드 콘텐츠 페이로드를 고려하세요.

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  "@&$#* includes special characters": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

JSON 키(대/소문자 구분 안 함, 특수 문자 포함)와 일치하는 열이 있는 테이블을 생성합니다.

CREATE TABLE PERSON_DATA (
  record_metadata VARIANT,
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  "has cat" BOOLEAN,
  "!@&$#* includes special characters" BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

일치 동작:

  • "city"``(kafka) ``city 또는 CITY 또는 ``City``(열) - 대/소문자 구분 안 함

  • ``”has cat”``(kafka) → ``”has cat”``(열) - 공백으로 인해 따옴표로 묶어야 함

  • ``!@&$#* includes special characters”``(kafka) → ``!@&$#* includes special characters”``(열) - 특수 문자가 유지됨

  • skills``family``와 같은 중첩 오브젝트를 VARIANT 열에 자동으로 매핑

사용자 정의 파이프 모드 사용하기 - 예제

이 예제에서는 사용자 지정 데이터 변환으로 사용자 정의 파이프를 구성하고 사용하는 방법을 보여줍니다.

예 1:

원하는 스키마로 테이블을 생성합니다.

CREATE TABLE ORDERS (
  order_id NUMBER,
  customer_name VARCHAR,
  order_total NUMBER,
  order_date TIMESTAMP_NTZ,
  source_topic VARCHAR
);
Copy

테이블 스키마와 일치하도록 수신 Kafka 레코드를 변환하는 파이프를 생성합니다.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::NUMBER,
    $1:customer_name,
    $1:order_total::NUMBER,
    $1:order_date::TIMESTAMP_NTZ,
    $1:RECORD_METADATA.topic
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

파이프 이름(ORDERS)은 테이블 이름(ORDERS)과 일치해야 합니다. 파이프 정의는 $1:field_name 구문을 사용하여 JSON 페이로드에서 필드를 추출하고 이를 테이블 열에 매핑합니다.

참고

$1['field name'] 또는 ``$1[‘has cat’]``과 같이 대괄호 표기법을 사용하는 특수 문자가 있는 중첩된 JSON 필드에 액세스할 수 있습니다.

항목과 테이블 매핑을 구성합니다.

snowflake.topic2table.map=kafka-orders-topic:ORDERS
Copy

이 구성은 이름이 ``ORDERS``인 기존 테이블 및 파이프에 대한 Kafka 항목 ``kafka-orders-topic``을 매핑합니다.

예 2:

기존 이름이 없는 콘텐츠의 키에 액세스해야 하는 경우 다음 구문을 사용합니다.

  • 단순 필드: $1:field_name

  • 공백 또는 특수 문자가 있는 필드: $1['field name'] 또는 $1['has cat']

  • 유니코드 문자가 있는 필드: $1[' @&$#* has Łułósżź']

  • 중첩 필드: $1:parent.child 또는 $1:parent['child field']

Kafka의 JSON 페이로드를 고려하세요.

{
  "city": "New York",
  "age": 30,
  "married": true,
  "has cat": true,
  " @&$#* has Łułósżź": true,
  "skills": ["sitting", "standing", "eating"],
  "family": {"son": "Jack", "daughter": "Anna"}
}
Copy

선택한 열 이름으로 대상 테이블을 생성합니다.

CREATE TABLE PERSON_DATA (
  city VARCHAR,
  age NUMBER,
  married BOOLEAN,
  has_cat BOOLEAN,
  weird_field_name BOOLEAN,
  skills VARIANT,
  family VARIANT
);
Copy

그런 다음 매핑을 정의하는 동일한 이름의 파이프를 생성합니다.

CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
  SELECT
    $1:city,
    $1:age,
    $1:married,
    $1['has cat'] AS has_cat,
    $1[' @&$#* has Łułósżź'] AS weird_field_name,
    $1:skills,
    $1:family
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

요점:

  • 열 이름 제어(예: ``”has cat”``의 이름을 ``has_cat``으로 변경)

  • 필요에 따라 데이터 타입을 캐스팅할 수 있음(예: $1:age::NUMBER)

  • 원하는 대로 필드를 포함 또는 제외할 수 있음

  • 메타데이터 필드를 추가할 수 있음(예: $1:RECORD_METADATA.topic)

  • VARIANT 열은 중첩된 JSON 구조체를 자동으로 처리함

예 3: 대화형 테이블 사용

대화형 테이블은 대기 시간이 짧고 동시성이 높은 쿼리에 최적화된 특수한 유형의 Snowflake 테이블입니다. 대화형 테이블에 대한 자세한 내용은 :doc:`대화형 테이블 설명서</user-guide/interactive>`에서 확인할 수 있습니다.

참고

대화형 테이블은 현재 선택한 계정에서만 미리 보기 기능으로 제공됩니다.

  1. 대화형 테이블을 생성합니다.

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    
    Copy
  2. 항목과 테이블 매핑을 구성합니다.

    snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
    
    Copy

중요 고려 사항:

  • 대화형 테이블에는 특정 제한 사항과 쿼리 제한 사항이 있습니다. 커넥터와 함께 사용하기 전에 :doc:`대화형 테이블 설명서</user-guide/interactive>`를 검토하세요.

  • 대화형 테이블의 경우 필요한 모든 변환은 테이블 정의에서 처리해야 합니다.

  • 대화형 테이블을 효율적으로 쿼리하려면 대화형 웨어하우스가 필요합니다.

명시적 항목-테이블 매핑

snowflake.topic2table.map 매개 변수를 구성할 때 커넥터는 명시적 매핑 모드에서 작동합니다. 이 모드에서는 다음을 수행할 수 있습니다.

  • 여러 Kafka 항목을 단일 Snowflake 테이블에 매핑

  • 항목 이름과 다른 사용자 지정 테이블 이름 사용

  • 여러 항목과 일치하도록 정규식 패턴 적용

구성 형식:

snowflake.topic2table.map 매개 변수는 쉼표로 구분된 항목-테이블 매핑 목록을 다음 형식으로 허용합니다.

topic1:table1,topic2:table2,topic3:table3
Copy

구성 예제:

직접 항목 매핑

snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
Copy

정규식 패턴 일치

snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
Copy

이 구성은 _cat``(예: ``orange_cat, calico_cat)으로 끝나는 모든 항목을 CAT_TABLE 테이블에 매핑하고 _dog``로 끝나는 모든 항목을 ``DOG_TABLE 테이블에 매핑합니다.

하나의 테이블에 여러 항목

snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
Copy

이 구성은 topic1``topic2``를 모두 ``shared_table``에 매핑하고 ``topic3``은 ``other_table``에 매핑합니다.

중요

  • 매핑의 정규식 패턴은 겹칠 수 없습니다. 각 항목은 최대 하나의 패턴과 일치해야 합니다.

  • 매핑의 테이블 이름은 문자 또는 밑줄로 시작하는 2자 이상의 유효한 Snowflake 식별자여야 합니다.

  • 여러 항목을 단일 테이블에 매핑할 수 있습니다.

레거시 RECORD_CONTENT 열

이전 버전의 커넥터에서는 스키마화 기능이 비활성화되었을 때 커넥터가 RECORD_CONTENT 및 RECORD_METADATA의 두 열이 있는 대상 테이블을 생성했습니다. RECORD_CONTENT 열에는 유형 VARIANT의 열에 전체 Kafka 메시지 내용이 포함되어 있습니다. RECORD_METADATA 열은 계속 지원되지만 RECORD_CONTENT 열은 커넥터에서 더 이상 생성되지 않습니다. SMT 변환을 사용하여 동일한 기능을 구현할 수 있습니다(이 섹션의 뒷부분에 있는 예제 참조). 또한 RECORD_CONTENT 키는 PIPE 변환에서 더 이상 사용할 수 없습니다. 예를 들어, 이 PIPE 정의는 기본적으로 작동하지 않습니다.

참고

이 파이프 정의는 추가 SMT 변환 없이 작동하지 않습니다.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

단일 열에 전체 Kafka 메시지 내용을 저장해야 하거나 PIPE 변환의 전체 Kafka 메시지 내용을 처리해야 하는 경우, 전체 Kafka 메시지 내용을 원하는 사용자 지정 필드로 래핑하는 다음 SMT 변환을 사용할 수 있습니다.

transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
Copy

이 변환은 전체 Kafka 메시지 내용을 your_top_level_field_name``이라는 사용자 지정 필드로 래핑합니다. 그런 다음 PIPE 변환의 ``$1:your_top_level_field_name 접근자를 사용하여 전체 Kafka 메시지 내용에 액세스할 수 있습니다.

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Copy

또는 기본 파이프를 사용하여 전체 메타데이터와 콘텐츠를 모두 단일 테이블에 저장하려면 사용자 지정 파이프를 생성하지 마세요. 대신, RECORD_CONTENT``your_top_level_field_name``이라는 두 열이 있는 테이블만 생성합니다.

CREATE TABLE ORDERS (
  record_metadata VARIANT,
  your_top_level_field_name VARIANT
);
Copy

HoistField$Value 변환에 대해 자세히 알아보려면 `Kafka 설명서<https://kafka.apache.org/39/documentation.html#connect_transforms>`_를 참조하세요.

경고

전체 Kafka 메시지 내용과 메타데이터를 테이블에 저장하면 수집 비용, 파이프라인 속도 및 대기 시간에 부정적인 영향을 미칠 수 있습니다. 최상의 성능이 필요한 경우 Kafka 레코드 내용의 최상위 수준에서 액세스할 수 있을 때 필요한 데이터만 저장하거나 SMT 변환을 사용하여 깊게 중첩된 필드에서 최상위 필드로 데이터를 추출합니다.

스트리밍 채널 오류 및 데드 레터 큐 처리하기

4.0.0-rc4 버전에서 커넥터는 오프셋을 커밋하기 전에 Snowpipe Streaming 채널 상태를 검사합니다. Snowflake가 거부된 행을 보고하는 경우(rowsErrorCount > 0), the connector now raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues cannot go unnoticed. To allow ingestion to continue while you triage bad rows, set errors.tolerance=all

errors.tolerance=all
Copy

스키마 진화

중요

Kafka용 고성능 Snowflake Connector에서는 스키마 진화가 지원되지 않습니다. 대상 테이블에 대한 스키마 변경 사항은 수동으로 관리해야 합니다.

커넥터는 수신되는 Kafka 레코드를 기반으로 스키마 변경 사항을 자동으로 감지하거나 테이블 스키마를 진화시키지 않습니다. 열을 추가하거나 데이터 타입을 수정하거나 다른 스키마를 변경해야 하는 경우 다음을 수행해야 합니다.

  1. **커넥터를 일시 중지**하여 데이터 수집 중지

  2. :doc:`/sql-reference/sql/alter-table`을 사용하여 테이블 스키마를 수동으로 변경 또는 테이블 다시 생성

  3. 사용자 정의 파이프를 사용하고 변환 논리를 변경해야 하는 경우 파이프 정의 업데이트

  4. **커넥터를 다시 시작**하여 데이터 수집 재개

참고

스키마 진화 지원은 향후 릴리스에 추가될 예정입니다.

내결함성

커넥터의 내결함성 제한 사항

Kafka 항목은 저장 공간 또는 보존 시간에 대한 제한으로 구성될 수 있습니다.

  • 시스템이 보존 시간 이상 동안 오프라인 상태로 유지되면 만료된 레코드는 로드되지 않습니다. 유사하게 Kafka의 저장소 공간 제한이 초과되면 일부 메시지가 전달되지 않습니다.

  • Kafka 항목의 메시지가 삭제 또는 업데이트되는 경우 해당 변경 사항이 Snowflake 테이블에 반영되지 않을 수 있습니다.

다음 단계

작업 설정.