Snowpipe Streaming과 함께 Kafka용 Snowflake Connector 사용하기

Kafka의 데이터 로딩 체인에서 Snowpipe를 Snowpipe Streaming 으로 선택적으로 바꿀 수 있습니다. 지정된 플러시 버퍼 임계값(시간 또는 메모리 또는 메시지 수)에 도달할 때 임시 스테이징된 파일에서 데이터를 쓰는 Snowpipe와는 달리, 커넥터가 Snowpipe Streaming API(《API》)를 호출하여 데이터 행을 Snowflake 테이블에 씁니다. 이 아키텍처에서는 비슷한 양의 데이터를 로딩하는 데 드는 비용이 낮아지는 것과 함께, 로드 지연 시간이 짧아집니다.

Snowpipe Streaming과 함께 사용하려면 Kafka 커넥터 버전 1.9.1(또는 그 이상)이 필요합니다. Snowpipe Streaming을 사용하는 Kafka 커넥터는 Snowflake Ingest SDK를 포함하며 Apache Kafka 항목에서 대상 테이블로 직접 행 스트리밍을 지원합니다.

Snowpipe Streaming with Kafka connector

참고

Snowpipe Streaming을 사용하는 Kafka 커넥터는 현재 스키마 감지 또는 스키마 진화를 지원하지 않습니다. 이 커넥터는 Snowpipe와 함께 사용되는 것과 동일한 테이블 스키마 를 사용합니다.

이 항목의 내용:

최소 필수 버전

Kafka 커넥터 버전 1.9.1 이상에서는 Snowpipe Streaming을 지원합니다.

Kafka 구성 속성

Kafka 커넥터 속성 파일에 연결 설정을 저장하십시오. 자세한 내용은 Kafka Connector 구성하기 섹션을 참조하십시오.

필수 속성

Kafka 커넥터 속성 파일에서 연결 설정을 추가하거나 편집합니다. 자세한 내용은 Kafka Connector 구성하기 섹션을 참조하십시오.

snowflake.ingestion.method

Kafka 커넥터를 스트리밍 수집 클라이언트로 사용할 경우에만 필요합니다. Snowpipe Streaming 또는 표준 Snowpipe를 사용하여 Kafka 주제 데이터를 로드할지 여부를 지정합니다. 지원되는 값은 다음과 같습니다.

  • SNOWPIPE_STREAMING

  • SNOWPIPE (기본값)

주제 데이터를 큐에 넣고 로드할 백엔드 서비스를 선택하는 데 추가로 설정할 필요가 없습니다. 평소와 같이 Kafka 커넥터 속성 파일에서 추가 속성을 구성하면 됩니다.

snowflake.role.name

테이블에 행을 삽입할 때 사용할 액세스 제어 역할입니다.

버퍼 및 폴링 속성

buffer.flush.time

버퍼 플러시 사이의 시간(초)입니다. 플러시할 때마다 버퍼링된 레코드에 대한 삽입 작업이 발생합니다. Kafka 커넥터는 매번 플러시 후 한 번씩 Snowpipe Streaming API(《API》)를 호출합니다.

buffer.flush.time 속성에 지원되는 최소값은 1 (초)입니다. 평균 데이터 흐름 속도를 더 높이려면 기본값을 줄여 지연 시간을 개선하는 것이 좋습니다. 지연 시간보다 비용이 더 큰 문제인 경우에는 버퍼 플러시 시간을 늘릴 수 있습니다. 메모리 부족 예외를 방지하려면 Kafka 메모리 버퍼가 가득 차기 전에 플러시하도록 주의하십시오.

1 - 상한 없음.

기본값

10

buffer.count.records

Snowflake로 수집하기 전에 Kafka 파티션당 메모리에 버퍼링된 레코드 수입니다.

1 - 상한 없음.

기본값

10000

buffer.size.bytes

Snowflake에서 데이터 파일로 수집되기 전에 Kafka 파티션별로 메모리에 버퍼링된 레코드의 누적 크기(바이트)입니다.

레코드는 데이터 파일에 기록될 때 압축됩니다. 그 결과, 버퍼에 있는 레코드의 크기가 레코드에서 생성된 데이터 파일의 크기보다 클 수 있습니다.

1 - 상한 없음.

기본값

20000000 (20MB)

Kafka 커넥터 속성 외에도, 단일 폴링에서 Kafka가 Kafka Connect에 반환하는 최대 레코드 수를 제어하는 Kafka 컨슈머 max.poll.records 속성에 유의하십시오. 500 의 기본값을 늘릴 수 있지만 메모리 제약 조건에 유의하십시오. 이 속성에 대한 자세한 내용은 Kafka 패키지 설명서를 참조하십시오.

오류 처리 및 DLQ 속성

errors.tolerance

Kafka 커넥터에서 발생한 오류를 처리하는 방법을 지정합니다.

이 속성에서 지원되는 값은 다음과 같습니다.

NONE

첫 번째 오류가 발생하면 데이터 로딩을 중지합니다.

ALL

모든 오류를 무시하고 데이터 로딩을 계속합니다.

기본값

NONE

errors.log.enable

Kafka Connect 로그 파일에 오류 메시지를 기록할지 여부를 지정합니다.

이 속성에서 지원되는 값은 다음과 같습니다.

TRUE

오류 메시지를 작성합니다.

FALSE

오류 메시지를 작성하지 마십시오.

기본값

FALSE

errors.deadletterqueue.topic.name

Snowflake 테이블에 수집할 수 없는 메시지를 Kafka로 전달하기 위해 Kafka에서 DLQ(배달 못한 편지 큐) 주제 이름을 지정합니다. 자세한 내용은 이 항목의 배달 못한 편지 큐 를 참조하십시오.

사용자 지정 텍스트 문자열

기본값

없습니다.

정확히 한 번 의미 체계

정확히 한 번 의미 체계는 중복이나 데이터 손실 없이 Kafka 메시지가 전달되도록 보장합니다. 이 전달 보장은 Snowpipe Streaming을 사용하는 Kafka 커넥터에 기본적으로 설정됩니다.

Kafka Connector는 파티션과 채널 사이에 일대일 매핑을 채택하고 다음 두 가지 별개의 오프셋을 사용합니다.

  • 컨슈머 오프셋: Kafka에서 관리하며 컨슈머가 소비한 가장 최근 오프셋을 추적합니다.

  • 오프셋 토큰: Snowflake에서 관리하며 Snowflake에서 가장 최근에 커밋된 오프셋을 추적합니다.

Kafka 커넥터는 다음 모범 사례를 구현하여 정확히 한 번 전달하는 결과를 달성합니다.

채널 열기/다시 열기:

  • 주어진 파티션의 채널을 열거나 다시 열 때 Kafka 커넥터는 getLatestCommittedOffsetToken API를 통해 Snowflake에서 검색된 최신 커밋 오프셋 토큰을 정보 소스로 사용하고 그에 따라 적절히 Kafka의 컨슈머 오프셋을 재설정합니다.

  • 컨슈머 오프셋이 데이터 보존 기간 내에 더 이상 들지 않을 경우 예외가 발생하며 적절히 취할 조치 사항을 결정할 수 있습니다.

  • Kafka Connector가 Kafka에서 컨슈머 오프셋을 재설정하지 않고 정보 소스로 사용하는 유일한 상황은 Snowflake의 오프셋 토큰이 NULL인 경우입니다. 이 경우, 커넥터는 Kafka에서 보낸 오프셋을 수락하고 오프셋 토큰은 이후에 업데이트됩니다.

레코드 처리:

  • Kafka의 잠재적인 버그로 인해 발생할 수 있는 비연속적 오프셋에 대한 추가 안전 계층을 보장하고자, Snowflake는 가장 최근에 처리된 오프셋을 추적하는 메모리 내 변수를 유지 관리합니다. Snowflake는 현재 행의 오프셋이 최근에 처리된 오프셋에 1을 더한 값과 같을 경우에만 행을 수락하므로, 수집 프로세스가 지속적이고 정확하도록 여분의 보호 계층을 추가합니다.

예외, 실패, 충돌 복구 처리:

  • 복구 프로세스의 일부로, Snowflake는 채널을 다시 열고 가장 최근에 커밋된 오프셋 토큰으로 컨슈머 오프셋을 재설정하여 앞에서 설명한 채널 열기/다시 열기 논리를 일관되게 준수합니다. 그렇게 함으로써, Kafka는 가장 최근에 커밋된 오프셋 토큰보다 1이 더 큰 오프셋 값에서 데이터를 보내도록 신호를 받아 데이터 손실 최소화 또는 제로화로 장애 지점에서 수집을 재개할 수 있도록 합니다.

재시도 메커니즘 구현:

  • 발생 가능한 일시적인 문제를 설명하기 위해, Snowflake는 API 호출에 재시도 메커니즘을 포함합니다. Snowflake는 이러한 API 호출을 여러 번 재시도하여 성공 가능성을 높이고 수집 프로세스에 영향을 미치는 간헐적인 실패가 발생할 위험을 완화합니다.

컨슈머 오프셋 진행:

  • Snowflake는 정기적으로 가장 최근에 커밋된 오프셋 토큰을 사용하여 컨슈머 오프셋을 진행해 수집 프로세스가 계속 Snowflake의 최신 데이터 상태에 맞춰 조정되도록 보장합니다.

변환기

Snowpipe Streaming을 사용하는 Kafka 커넥터는 다음 key.converter 또는 value.converter 값을 지원하지 않습니다.

  • com.snowflake.kafka.connector.records.SnowflakeJsonConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverter

  • com.snowflake.kafka.connector.records.SnowflakeAvroConverterWithoutSchemaRegistry

사용자 지정 Snowflake 변환기는 파일을 테이블 스테이지로 이동하여 데이터 로딩을 방해하는 오류를 처리합니다. 이 워크플로는 Snowpipe Streaming 배달 못한 편지 큐 와 충돌합니다.

배달 못한 편지 큐

Snowpipe Streaming을 사용하는 Kafka 커넥터는 손상된 레코드 또는 오류로 인해 성공적으로 처리할 수 없는 레코드에 대해 배달 못한 편지 큐(DLQ)를 지원합니다.

모니터링에 대한 자세한 내용은 Apache Kafka 설명서 를 참조하십시오.

청구 및 사용량

Snowpipe Streaming 청구 정보는 Snowpipe Streaming 비용 섹션을 참조하십시오.