Working with the Snowflake High Performance connector for Kafka¶
This topic describes how the connector works with tables and pipes, and how to configure the connector with these elements.
커넥터가 테이블 및 파이프에서 작동하는 방식¶
The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:
{
"order_id": 12345,
"customer_name": "John",
"order_total": 100.00,
"isPaid": true
}
기본적으로 수집이 시작되기 전에 테이블이나 파이프를 만들 필요가 없습니다. 커넥터는 JSON 키와 일치하는 열이 있는 테이블을 생성하고, 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용하여 레코드 내용의 첫 번째 수준 키를 이름별(대/소문자 구분 안 함)로 일치하는 테이블 열에 자동으로 매핑합니다. JSON 키와 일치하는 열이 있는 자체 테이블을 만들 수도 있습니다. 커넥터는 레코드 내용의 첫 번째 수준 키를 이름별로 테이블 열과 일치시키려고 시도합니다. JSON의 키가 테이블 열과 일치하지 않는 경우 커넥터는 키를 무시합니다.
CREATE TABLE ORDERS (
record_metadata VARIANT,
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
ispaid BOOLEAN
);
자체 파이프를 생성하기로 선택한 경우 파이프의 COPY INTO 문에서 데이터 변환 논리를 정의할 수 있습니다. 필요에 따라 열의 이름을 변경하고 데이터 타입을 캐스팅할 수 있습니다. 예:
CREATE TABLE ORDERS (
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::STRING,
$1:customer_name,
$1:order_total::STRING,
$1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
또는
CREATE TABLE ORDERS (
topic VARCHAR,
partition VARCHAR,
order_id VARCHAR,
customer_name VARCHAR,
order_total VARCHAR,
ispaid VARCHAR
);
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_METADATA.topic::STRING AS topic,
$1:RECORD_METADATA.partition::STRING AS partition,
$1['order_id']::STRING AS order_id,
$1['customer_name']::STRING as customer_name,
CONCAT($1['order_total']::STRING, ' USD') AS order_total,
$1['isPaid']::STRING AS ispaid
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
When you define your own pipe your destination table columns need not need match the JSON keys. You can rename the columns to your desired names and cast the data types if required.
항목 이름, 테이블 이름, 파이프 이름¶
구성 설정에 따라 커넥터는 대상 테이블에 대해 다른 이름을 사용합니다. 대상 테이블 이름은 항상 항목 이름에서 파생됩니다.
커넥터가 항목 이름을 대상 테이블에 매핑하는 방법¶
Kafka 커넥터는 Kafka 항목 이름을 Snowflake 테이블 이름에 매핑하기 위한 두 가지 모드를 제공합니다.
정적 매핑: 커넥터가 Kafka 항목 이름만 사용하여 대상 테이블 이름을 파생합니다.
명시적 항목-테이블 매핑 모드:
snowflake.topic2table.map구성 매개 변수를 사용하여 항목과 테이블 간의 사용자 지정 매핑을 지정합니다.
정적 매핑¶
snowflake.topic2table.map 매개 변수를 구성하지 않은 경우 커넥터는 항상 항목 이름에서 테이블 이름을 파생합니다.
테이블 이름 생성:
커넥터는 다음 규칙을 사용하여 항목 이름에서 대상 테이블 이름을 파생합니다.
If the topic name is a valid Snowflake identifier the connector uses the topic name as the destination table name, converted to uppercase).
항목 이름에 유효하지 않은 문자가 포함된 경우 커넥터는 다음을 수행합니다.
유효하지 않은 문자를 밑줄로 대체
고유성을 보장하기 위해 밑줄 뒤에 해시 코드 추가
예를 들어,
my-topic.data항목은 ``MY_TOPIC_DATA_<hash>``가 됩니다.
파이프 이름 결정:
커넥터는 다음 논리에 따라 사용할 파이프를 결정합니다.
커넥터는 대상 테이블 이름과 동일한 이름의 파이프가 있는지 확인합니다.
해당 이름의 사용자 생성 파이프가 있는 경우 커넥터는 해당 파이프를 사용합니다(사용자 정의 파이프 모드).
그렇지 않은 경우 커넥터는 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용합니다.
참고
Snowflake는 예측 가능한 테이블 이름을 보장하기 위해 Snowflake 식별자 이름에 대한 규칙을 따르는 항목 이름을 선택할 것을 권장합니다.
RECORD_METADATA 이해하기¶
커넥터는 Kafka 레코드에 대한 메타데이터가 있는 RECORD_METADATA 구조를 채웁니다. 이 메타데이터는 Snowpipe Streaming 데이터 소스를 통해 Snowflake로 전송되며, 여기서 $1:RECORD_METADATA 접근자를 사용하는 파이프 변환에 제공됩니다. RECORD_METADATA 구조는 사용자 정의 파이프 모드와 기본 파이프 모드에서 모두 사용할 수 있습니다. 해당 내용은 VARIANT 유형의 열에 저장하거나 개별 필드를 추출하여 별도의 열에 저장할 수 있습니다.
변환 및 메타데이터가 있는 예제 파이프:
CREATE PIPE ORDERS AS
COPY INTO ORDERS_TABLE
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total,
$1:RECORD_METADATA.topic AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset,
$1:RECORD_METADATA.SnowflakeConnectorPushTime::BIGINT AS ingestion_time
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
이 예제에서:
파이프는 Kafka 메시지에서 특정 필드(order_id, customer_name, order_total)를 추출합니다.
또한 메타데이터 필드(항목, 오프셋 및 수집 타임스탬프)도 캡처합니다.
값은 필요에 따라 캐스팅 및/또는 변환될 수 있습니다.
메타데이터 필드를 채우는 방법¶
커넥터는 Kafka 레코드 속성 및 커넥터 구성을 기반으로 메타데이터 필드를 자동으로 채웁니다. 다음 구성 매개 변수를 사용하여 포함할 메타데이터 필드를 제어할 수 있습니다.
``snowflake.metadata.topic``(기본값: true) - 항목 이름 포함
``snowflake.metadata.offset.and.partition``(기본값: true) - 오프셋 및 파티션 포함
``snowflake.metadata.createtime``(기본값: true) - Kafka 레코드 타임스탬프 포함
``snowflake.metadata.all``(기본값: true) - 사용 가능한 모든 메타데이터 포함
``snowflake.metadata.all=true``(기본값)인 경우 모든 메타데이터 필드가 채워집니다. 개별 메타데이터 플래그를 ``false``로 설정하면 RECORD_METADATA 구조에서 해당 특정 필드를 제외합니다.
참고
SnowflakeConnectorPushTime 필드는 항상 사용할 수 있으며 커넥터가 레코드를 수집 버퍼로 푸시한 시간을 나타냅니다. 이는 엔드투엔드 수집 대기 시간을 계산하는 데 유용합니다.
RECORD_METADATA 구조에 기본적으로 포함되는 정보는 다음과 같습니다.
필드 |
데이터 타입 |
설명 |
|---|---|---|
항목 |
String |
레코드가 제공된 Kafka 항목의 이름입니다. |
파티션 |
String |
항목 내의 파티션 수입니다. (Snowflake 마이크로 파티션이 아닌 Kafka 파티션이라는 점에 유의.) |
오프셋 |
숫자 |
해당 파티션의 오프셋입니다. |
CreateTime / . LogAppendTime |
숫자 |
This is the timestamp associated with the message in the Kafka topic. The value is milliseconds since midnight January 1, 1970, UTC. For more information, see: Kafka ProducerRecord documentation. |
SnowflakeConnectorPushTime |
숫자 |
레코드가 Ingest SDK 버퍼에 푸시된 타임스탬프. 값은 1970년 1월 1일 자정(UTC) 이후의 밀리초 수입니다. 자세한 내용은 수집 지연 예상하기 섹션을 참조하십시오. |
키 |
String |
If the message is a Kafka KeyedMessage, this is the key for that message.
In order for the connector to store the key in the RECORD_METADATA, the |
헤더 |
오브젝트 |
A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers. |
RECORD_METADATA 열에 기록된 메타데이터의 양은 선택적 Kafka 구성 속성을 사용하여 구성할 수 있습니다.
필드 이름 및 값에서는 대/소문자를 구분합니다.
수집 전에 Kafka 레코드를 변환하는 방법¶
각 행이 Snowpipe Streaming으로 전달되기 전에 커넥터는 Kafka Connect 레코드 값을 ``Map<String, Object>``로 변환합니다. 해당 키는 대상 열 이름과 일치해야 합니다(또는 사용자 정의 파이프 내에서 변환될 수 있음). 커넥터가 정형화 오브젝트를 수신할 수 있도록 기본 문자열, 바이트 배열 또는 숫자는 래핑해야 합니다(예: HoistFieldSMT). 변환기는 다음 규칙을 적용합니다.
Null 값은 삭제 표시로 처리됩니다. ``behavior.on.null.values=IGNORE``인 경우 건너뛰거나 그렇지 않은 경우 빈 JSON 오브젝트로 수집됩니다.
숫자 및 부울 필드는 있는 그대로 전달됩니다. 전체 자릿수가 38보다 큰 10진수 값은 Snowflake의
NUMBER제한 내에서 유지되기 위해 문자열로 직렬화됩니다.byte[]및ByteBuffer페이로드는 Base64로 인코딩된 문자열이므로VARIANT또는VARCHAR열에 저장합니다.배열은 배열로 유지되고 중첩된 오브젝트는 중첩된 맵으로 유지됩니다. 기본 파이프를 사용하여 중첩된 데이터를 있는 그대로 랜딩하는 경우
VARIANT열을 선언합니다.Snowflake 열 이름은 텍스트여야 하므로 문자열이 아닌 키가 있는 맵은
[key, value]페어의 배열로 내보내집니다.관련 메타데이터 플래그가 활성화될 때마다 레코드 헤더와 키가 ``RECORD_METADATA``에 복사됩니다.
전체 메시지 본문을 단일 열로 보존해야 하는 경우 SMTs를 사용하여 새 최상위 필드로 래핑합니다. 변환 패턴에 대해서는 :ref:`레거시 RECORD_CONTENT 열<label-kafkahp_connector_legacy_record_content_column>`을 참조하세요.
사용자 정의 파이프 모드와 기본 파이프 모드¶
커넥터는 데이터 수집 관리를 위한 두 가지 모드를 지원합니다.
사용자 정의 파이프 모드¶
이 모드에서는 데이터 변환 및 열 매핑을 완전히 제어할 수 있습니다.
이 모드를 사용하는 경우:
JSON 필드 이름과 다른 사용자 지정 열 이름이 필요한 경우
데이터 변환(형식 캐스팅, 마스킹, 필터링)을 적용해야 하는 경우
데이터가 열에 매핑되는 방식을 완전히 제어하려는 경우
기본 파이프 모드¶
이 모드에서 커넥터는 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용하며, kafka 레코드 필드를 이름별로 일치하는 테이블 열에 매핑합니다(대/소문자 구분 안 함).
이 모드를 사용하는 경우:
kafka 레코드 키 이름이 원하는 열 이름과 일치하는 경우
사용자 지정 데이터 변환이 필요하지 않은 경우
You want a simple configuration
기본 파이프 모드를 사용하여 kafka 레코드 키를 테이블 열에 매핑하기
기본 파이프 모드를 사용할 때 커넥터는 이름이 ``{tableName}-STREAMING``인 기본 파이프를 사용하며, 대/소문자를 구분하지 않는 일치를 통해 콘텐츠의 첫 번째 수준 키를 테이블 열에 직접 매핑합니다.
기본 파이프 모드 사용하기 - 예제¶
예 1:¶
다음 kafka 레코드 콘텐츠 페이로드를 고려하세요.
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
"@&$#* includes special characters": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
JSON 키(대/소문자 구분 안 함, 특수 문자 포함)와 일치하는 열이 있는 테이블을 생성합니다.
CREATE TABLE PERSON_DATA (
record_metadata VARIANT,
city VARCHAR,
age NUMBER,
married BOOLEAN,
"has cat" BOOLEAN,
"!@&$#* includes special characters" BOOLEAN,
skills VARIANT,
family VARIANT
);
일치 동작:
사용자 정의 파이프 모드 사용하기 - 예제¶
이 예제에서는 사용자 지정 데이터 변환으로 사용자 정의 파이프를 구성하고 사용하는 방법을 보여줍니다.
예 1:¶
원하는 스키마로 테이블을 생성합니다.
CREATE TABLE ORDERS (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
order_date TIMESTAMP_NTZ,
source_topic VARCHAR
);
테이블 스키마와 일치하도록 수신 Kafka 레코드를 변환하는 파이프를 생성합니다.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:order_date::TIMESTAMP_NTZ,
$1:RECORD_METADATA.topic
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
파이프 이름(ORDERS)은 테이블 이름(ORDERS)과 일치해야 합니다. 파이프 정의는 $1:field_name 구문을 사용하여 JSON 페이로드에서 필드를 추출하고 이를 테이블 열에 매핑합니다.
참고
$1['field name'] 또는 ``$1[‘has cat’]``과 같이 대괄호 표기법을 사용하는 특수 문자가 있는 중첩된 JSON 필드에 액세스할 수 있습니다.
항목과 테이블 매핑을 구성합니다.
snowflake.topic2table.map=kafka-orders-topic:ORDERS
이 구성은 이름이 ``ORDERS``인 기존 테이블 및 파이프에 대한 Kafka 항목 ``kafka-orders-topic``을 매핑합니다.
예 2:¶
기존 이름이 없는 콘텐츠의 키에 액세스해야 하는 경우 다음 구문을 사용합니다.
단순 필드:
$1:field_name공백 또는 특수 문자가 있는 필드:
$1['field name']또는$1['has cat']유니코드 문자가 있는 필드:
$1[' @&$#* has Łułósżź']중첩 필드:
$1:parent.child또는$1:parent['child field']
Kafka의 JSON 페이로드를 고려하세요.
{
"city": "New York",
"age": 30,
"married": true,
"has cat": true,
" @&$#* has Łułósżź": true,
"skills": ["sitting", "standing", "eating"],
"family": {"son": "Jack", "daughter": "Anna"}
}
선택한 열 이름으로 대상 테이블을 생성합니다.
CREATE TABLE PERSON_DATA (
city VARCHAR,
age NUMBER,
married BOOLEAN,
has_cat BOOLEAN,
weird_field_name BOOLEAN,
skills VARIANT,
family VARIANT
);
그런 다음 매핑을 정의하는 동일한 이름의 파이프를 생성합니다.
CREATE PIPE PERSON_DATA AS
COPY INTO PERSON_DATA
FROM (
SELECT
$1:city,
$1:age,
$1:married,
$1['has cat'] AS has_cat,
$1[' @&$#* has Łułósżź'] AS weird_field_name,
$1:skills,
$1:family
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
요점:
예 3: 대화형 테이블 사용¶
대화형 테이블은 대기 시간이 짧고 동시성이 높은 쿼리에 최적화된 특수한 유형의 Snowflake 테이블입니다. 대화형 테이블에 대한 자세한 내용은 :doc:`대화형 테이블 설명서</user-guide/interactive>`에서 확인할 수 있습니다.
대화형 테이블을 생성합니다.
CREATE INTERACTIVE TABLE REALTIME_METRICS ( metric_name VARCHAR, metric_value NUMBER, source_topic VARCHAR, timestamp TIMESTAMP_NTZ ) AS (SELECT $1:M_NAME::VARCHAR, $1:M_VALUE::NUMBER, $1:RECORD_METADATA.topic::VARCHAR, $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
항목과 테이블 매핑을 구성합니다.
snowflake.topic2table.map=metrics-topic:REALTIME_METRICS
중요 고려 사항:
대화형 테이블에는 특정 제한 사항과 쿼리 제한 사항이 있습니다. 커넥터와 함께 사용하기 전에 :doc:`대화형 테이블 설명서</user-guide/interactive>`를 검토하세요.
대화형 테이블의 경우 필요한 모든 변환은 테이블 정의에서 처리해야 합니다.
대화형 테이블을 효율적으로 쿼리하려면 대화형 웨어하우스가 필요합니다.
명시적 항목-테이블 매핑¶
snowflake.topic2table.map 매개 변수를 구성할 때 커넥터는 명시적 매핑 모드에서 작동합니다. 이 모드에서는 다음을 수행할 수 있습니다.
여러 Kafka 항목을 단일 Snowflake 테이블에 매핑
항목 이름과 다른 사용자 지정 테이블 이름 사용
여러 항목과 일치하도록 정규식 패턴 적용
구성 형식:
snowflake.topic2table.map 매개 변수는 쉼표로 구분된 항목-테이블 매핑 목록을 다음 형식으로 허용합니다.
topic1:table1,topic2:table2,topic3:table3
구성 예제:
직접 항목 매핑
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
정규식 패턴 일치
snowflake.topic2table.map=.*_cat:CAT_TABLE,.*_dog:DOG_TABLE
이 구성은 _cat``(예: ``orange_cat, calico_cat)으로 끝나는 모든 항목을 CAT_TABLE 테이블에 매핑하고 _dog``로 끝나는 모든 항목을 ``DOG_TABLE 테이블에 매핑합니다.
하나의 테이블에 여러 항목
snowflake.topic2table.map=topic1:shared_table,topic2:shared_table,topic3:other_table
이 구성은 topic1 및 ``topic2``를 모두 ``shared_table``에 매핑하고 ``topic3``은 ``other_table``에 매핑합니다.
중요
매핑의 정규식 패턴은 겹칠 수 없습니다. 각 항목은 최대 하나의 패턴과 일치해야 합니다.
매핑의 테이블 이름은 문자 또는 밑줄로 시작하는 2자 이상의 유효한 Snowflake 식별자여야 합니다.
여러 항목을 단일 테이블에 매핑할 수 있습니다.
레거시 RECORD_CONTENT 열¶
In prior versions of the connector (3.x and earlier), when the schematization feature was disabled, the connector created a destination table with two columns: RECORD_CONTENT and RECORD_METADATA. The RECORD_CONTENT column contained the entire Kafka message content in a column of type VARIANT. The RECORD_METADATA column continues to be supported but the RECORD_CONTENT column is no longer created by the connector. The same functionality can be achieved using SMT transformations (see examples later in this section). The RECORD_CONTENT key is also no longer available in PIPE transformations. For example, this PIPE definition will not work by default:
참고
이 파이프 정의는 추가 SMT 변환 없이 작동하지 않습니다.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:RECORD_CONTENT
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
단일 열에 전체 Kafka 메시지 내용을 저장해야 하거나 PIPE 변환의 전체 Kafka 메시지 내용을 처리해야 하는 경우, 전체 Kafka 메시지 내용을 원하는 사용자 지정 필드로 래핑하는 다음 SMT 변환을 사용할 수 있습니다.
transforms=wrapKafkaMessageContent
transforms.wrapKafkaMessageContent.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.wrapKafkaMessageContent.field=your_top_level_field_name
이 변환은 전체 Kafka 메시지 내용을 your_top_level_field_name``이라는 사용자 지정 필드로 래핑합니다. 그런 다음 PIPE 변환의 ``$1:your_top_level_field_name 접근자를 사용하여 전체 Kafka 메시지 내용에 액세스할 수 있습니다.
CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
SELECT
$1:your_top_level_field_name
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
또는 기본 파이프를 사용하여 전체 메타데이터와 콘텐츠를 모두 단일 테이블에 저장하려면 사용자 지정 파이프를 생성하지 마세요. 대신, RECORD_CONTENT 및 ``your_top_level_field_name``이라는 두 열이 있는 테이블만 생성합니다.
CREATE TABLE ORDERS (
record_metadata VARIANT,
your_top_level_field_name VARIANT
);
HoistField$Value 변환에 대해 자세히 알아보려면 `Kafka 설명서<https://kafka.apache.org/39/documentation.html#connect_transforms>`_를 참조하세요.
경고
전체 Kafka 메시지 내용과 메타데이터를 테이블에 저장하면 수집 비용, 파이프라인 속도 및 대기 시간에 부정적인 영향을 미칠 수 있습니다. 최상의 성능이 필요한 경우 Kafka 레코드 내용의 최상위 수준에서 액세스할 수 있을 때 필요한 데이터만 저장하거나 SMT 변환을 사용하여 깊게 중첩된 필드에서 최상위 필드로 데이터를 추출합니다.
스트리밍 채널 오류 및 데드 레터 큐 처리하기¶
The connector inspects the Snowpipe Streaming channel status before committing offsets in Kafka. If the connector detects that the rowsErrorCount property on channel has increased since the connector was started, it raises a fatal error (ERROR_5030) when errors.tolerance=none so that data issues don’t go unnoticed. To allow ingestion to continue while triaging bad rows, set errors.tolerance=all
errors.tolerance=all
스키마 진화¶
:code:`ENABLE_SCHEMA_EVOLUTION=TRUE`인 테이블의 경우 커넥터는 수신되는 Kafka 레코드를 기반으로 스키마를 자동으로 진화시킵니다. 커넥터가 생성하는 모든 테이블의 기본값은 :code:`ENABLE_SCHEMA_EVOLUTION=TRUE`입니다.
스키마 진화는 다음 작업으로 제한됩니다.
새 열을 추가합니다. 커넥터는 수신 Kafka 레코드에 테이블에 없는 새 필드가 포함된 경우 테이블에 새 열을 추가합니다.
삽입된 레코드에 데이터가 누락된 열의 NOT NULL 제약 조건 삭제하기
Apache Iceberg™ 테이블에 커넥터 사용하기¶
커넥터는 Snowflake가 관리하는 Apache Iceberg™ 테이블로 데이터를 수집할 수 있지만 다음 요구 사항을 충족해야 합니다.
Apache Iceberg™ 테이블과 연결된 외부 볼륨에 대한 USAGE 권한이 부여되어야 합니다.
커넥터를 실행하기 전에 Apache Iceberg™ 테이블을 생성해야 합니다.
외부 볼륨에 사용 권한 부여¶
Kafka 커넥터의 역할에 Apache Iceberg™ 테이블과 연결된 외부 볼륨에 대한 USAGE 권한을 부여하려면 다음 문을 실행합니다.
예를 들어, Iceberg 테이블이 kafka_external_volume 외부 볼륨을 사용하고 커넥터가 kafka_connector_role 역할을 사용하는 경우 다음 문을 실행합니다.
USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
수집을 위한 Apache Iceberg™ 생성¶
커넥터는 Iceberg 테이블을 자동으로 생성하지 않으며 스키마 진화를 지원하지 않습니다. 커넥터를 실행하기 전에 Iceberg 테이블을 수동으로 생성해야 합니다.
Iceberg 테이블을 생성할 때 Iceberg 데이터 타입(VARIANT 포함) 또는 :doc:`호환되는 Snowflake 타입</user-guide/tables-iceberg-data-types>`을 사용할 수 있습니다.
예를 들어, 다음 메시지를 생각해 보십시오.
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"options":
{
"can_walk": true,
"can_talk": false
},
"date_added": "2024-10-15"
}
예제 메시지에 대한 Iceberg 테이블을 생성하려면 다음 문을 사용합니다.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id number(38,0), name varchar, body_temperature number(4,2), approved_coffee_types array(varchar), animals_possessed variant, options object(can_walk boolean, can_talk boolean), date_added date ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table' ICEBERG_VERSION = 3;CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, name string, body_temperature float, approved_coffee_types array(string), animals_possessed variant, date_added date, options object(can_walk boolean, can_talk boolean), ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table' ICEBERG_VERSION = 3;
참고
dogs 또는 cats 같이 중첩된 구조 내의 필드 이름은 대소문자를 구분합니다.