Snowpipe Streaming을 사용하는 Kafka 커넥터의 스키마 감지 및 진화

Snowpipe Streaming을 사용하는 Kafka 커넥터는 스키마 감지 및 진화를 지원합니다. Snowflake의 테이블 구조는 Kafka 커넥터가 로드하는 새로운 Snowpipe Streaming 데이터의 구조를 지원하도록 자동으로 정의되고 진화할 수 있습니다.

스키마 감지 및 진화가 없으면 Kafka 커넥터로 로드한 Snowflake 테이블은 RECORD_CONTENT 및 RECORD_METADATA의 두 VARIANT 열로만 구성됩니다. 스키마 감지 및 진화가 활성화되면 Snowflake가 스트리밍 데이터의 스키마를 감지하고 사용자 정의 스키마와 자동으로 일치하는 테이블에 데이터를 로드할 수 있습니다. 또한 Snowflake를 사용해 새 열을 추가하거나 새 데이터 파일에 누락된 열에서 NOT NULL 제약 조건을 삭제할 수도 있습니다.

참고

이 미리 보기 기능은 Snowpipe Streaming을 사용하는 Kafka 커넥터에서만 작동합니다. 현재는 파일 기반 Snowpipe를 사용하는 Kafka 커넥터를 지원하지 않습니다.

이 항목의 내용:

전제 조건

이 기능을 활성화하기 전에 다음 필수 구성 요소를 설정해야 합니다.

  • Kafka 커넥터 버전 2.0.0 이상을 다운로드합니다. 자세한 내용은 Kafka 커넥터 설치 및 구성하기 섹션을 참조하십시오.

  • ALTER TABLE 명령을 사용하여 테이블에 대한 ENABLE_SCHEMA_EVOLUTION 매개 변수를 TRUE로 설정합니다. 또한 테이블에 대한 EVOLVE SCHEMA 권한이 있는 역할을 사용해야 합니다. 자세한 내용은 테이블 스키마 진화 섹션을 참조하십시오.

필수 Kafka 속성 구성하기

Kafka 커넥터 속성 파일에서 다음 필수 속성을 구성합니다.

snowflake.ingestion.method

Kafka 항목 데이터를 로드하려면 SNOWPIPE_STREAMING 을 사용하도록 지정하십시오. 이 미리 보기 기능은 현재 SNOWPIPE 를 지원하지 않습니다.

snowflake.enable.schematization

Snowpipe Streaming을 사용하여 Kafka 커넥터의 스키마 감지 및 진화를 활성화하려면 TRUE 로 지정하십시오. 기본값은 FALSE 입니다.

schema.registry.url

스키마 레지스트리 서비스의 URL로 지정합니다. 기본값은 빈 값입니다.

파일 형식에 따라 schema.registry.url 은 필수 사항 또는 선택 사항입니다. Kafka 커넥터를 사용한 스키마 감지는 아래 시나리오 중 하나에서 지원됩니다.

  • Avro 및 Protobuf에는 스키마 레지스트리가 필요합니다. 이 열은 제공된 스키마 레지스트리에 정의된 데이터 타입으로 생성됩니다.

  • 스키마 레지스트리는 JSON에 대해 선택 사항입니다. 스키마 레지스트리가 없으면 제공된 데이터를 기반으로 데이터 타입이 유추됩니다.

평소와 같이 Kafka 커넥터 속성 파일에서 추가 속성을 구성하면 됩니다. 자세한 내용은 Kafka 커넥터 구성하기 섹션을 참조하십시오.

변환기

Json, Avro, Protobuf와 같은 모든 정형 데이터 변환기가 지원됩니다. 다음과 같은 정형 데이터 변환기만 테스트했습니다.

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

어떤 비정형 데이터 변환기도 지원되지 않습니다. 예를 들어,

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

일부 사용자 지정 데이터 변환기는 지원되지 않을 수도 있습니다. 도움이 필요하면 Snowflake 지원에 문의하십시오.

사용법 노트

  • Kafka 커넥터를 사용한 스키마 감지는 제공된 스키마 레지스트리 유무에 관계없이 지원됩니다. 스키마 레지스트리(Avro 및 Protobuf)를 사용하는 경우 제공된 스키마 레지스트리에 정의된 데이터 타입으로 열이 생성됩니다. 스키마 레지스트리(JSON)가 없으면 제공된 데이터를 기반으로 데이터 타입이 유추됩니다.

  • Kafka 커넥터를 사용한 스키마 진화는 다음 테이블 열 수정을 지원합니다.

    • 새 열 추가

    • 원본 데이터 열이 누락된 경우 NOT NULL 제약 조건 삭제

  • Kafka 커넥터가 대상 테이블을 생성하는 경우 기본적으로 스키마 진화가 활성화됩니다. 하지만 기존 테이블에 대해 스키마 진화가 비활성화된 경우 Kafka 커넥터는 일치하지 않는 스키마가 있는 행을 구성된 배달 못 한 편지 큐(DLQ)로 보내려고 시도합니다.

다음 예에서는 Snowpipe Streaming을 사용하는 Kafka 커넥터에 대해 스키마 감지 및 진화가 활성화되기 전과 후에 생성되는 테이블을 보여줍니다.

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy