채널 및 정확히 한 번 전달

이 항목에서는 Snowpipe Streaming이 순서가 보장되는 채널을 통해 데이터를 수집하는 방법과 오프셋 토큰이 정확히 한 번 전달을 지원하는 방법에 대해 설명합니다.

스트리밍 수집 기본 사항

Snowpipe Streaming은 몇 가지 핵심 스트리밍 수집 원칙을 기반으로 구축되었습니다.

  • 지속적인 수집: 데이터는 일괄 수집되어 주기적으로 로드되는 것이 아니라 생성될 때 Snowflake로 이동합니다. 애플리케이션은 수명이 긴 연결을 통해 지속적으로 행을 제출하고 Snowflake는 데이터를 자동으로 커밋합니다.

  • 정확히 한 번 전달: 각 레코드는 클라이언트 장애 또는 네트워크 중단이 발생하는 경우에도 정확히 한 번 수집됩니다. Snowpipe Streaming은 오프셋 토큰 추적을 통해 이를 달성하며, 이를 통해 클라이언트는 데이터를 복제하지 않고 마지막으로 커밋된 위치에서 재개할 수 있습니다.

  • 순차적 수집: 행은 채널 내에서 제출된 순서대로 커밋됩니다. 이를 통해 시계열 데이터, CDC 파이프라인, 감사 추적에 중요한 소스 시스템의 이벤트 시퀀스가 보존됩니다.

  • 낮은 대기 시간: 데이터는 수집 후 5초 이내에 쿼리에 사용할 수 있게 됩니다. 이를 통해 기존 일괄 로딩의 지연 없이 거의 실시간으로 분석할 수 있습니다.

  • 서버리스: Snowflake는 수집을 위한 모든 컴퓨팅 리소스를 관리합니다. 리소스는 클라이언트가 프로비저닝하거나 관리할 인프라 없이 처리량에 따라 자동으로 확장됩니다.

데이터 흐름 방식

클라이언트 애플리케이션은 Snowpipe Streaming SDK(Java 또는 Python) 또는REST API를 사용하여 Snowflake에 연결합니다. 클라이언트는 파이프에 대해 하나 이상의 채널을 연 다음 해당 채널을 통해 행을 제출합니다. Snowflake는 데이터를 버퍼링하고 대상 테이블에 커밋하여 몇 초 이내에 쿼리에 사용할 수 있도록 합니다.

엔드투엔드 흐름:

  1. **클라이언트 애플리케이션**은 SDK(appendRows) 또는 REST API(Append Rows 엔드포인트)를 사용하여 행을 제출합니다.

  2. **채널**은 행을 순서대로 수신하고 진행 상황을 추적하기 위해 각 배치를 오프셋 토큰과 연결합니다.

  3. **파이프**는 서버 측에서 데이터를 처리합니다. 즉, 스키마의 유효성을 검사하고, 구성된 변환 또는 사전 클러스터링을 적용한 후 대상 테이블에 커밋합니다.

  4. **대상 테이블**은 커밋된 데이터를 수신하여 즉시 쿼리할 수 있게 됩니다.

Snowpipe Streaming 클라이언트, 채널, 테이블 매핑

채널

채널은 테이블에 데이터를 로드하기 위한, Snowflake에 대한 논리적이고 명명된 스트리밍 연결입니다. 채널은 다음 두 가지 보장을 제공합니다.

  • 순차적 수집: 행의 순서와 해당 오프셋 토큰은 채널 내에서 유지됩니다.

  • 정확히 한 번 전달: 클라이언트는 오프셋 토큰을 사용하여 커밋된 진행 상황을 추적하고 복구 시 마지막으로 커밋된 위치부터 재생할 수 있습니다.

순서는 채널 내에서 유지되지만 동일한 테이블을 가리키는 채널 전체에서는 유지되지 않습니다.

채널은 파이프에 대해 열립니다. 클라이언트 SDK는 여러 파이프에 대해 여러 채널을 열 수 있지만, 이 SDK는 여러 계정에서 채널을 열 수는 없습니다. 채널은 클라이언트가 데이터를 능동적으로 삽입할 때 장기간 유지되도록 설계되어 있으며, 오프셋 토큰 정보가 보존되므로 클라이언트 프로세스가 재시작될 때마다 재사용되어야 합니다.

채널 및 관련 오프셋 메타데이터가 더 이상 필요하지 않은 경우 DropChannelRequest API를 사용하여 채널을 영구적으로 삭제할 수 있습니다. 다음 두 가지 방법으로 채널을 삭제할 수 있습니다.

  • 종료할 때 채널 삭제. 채널 내부의 데이터는 채널이 삭제되기 전에 자동으로 플러시됩니다.

  • 무작정 채널 삭제. 이 접근 방식은 보류 중인 데이터를 모두 삭제하므로 권장되지 않습니다.

SHOW CHANNELS 명령을 실행하여 액세스 권한이 있는 채널을 나열할 수 있습니다. 자세한 내용은 SHOW CHANNELS 섹션을 참조하십시오.

참고

오프셋 토큰과 함께 비활성 채널은 30일 동안 활동이 없을 경우 자동으로 삭제됩니다.

오프셋 토큰 및 정확히 한 번 전달

Snowpipe Streaming에서 정확히 한 번 작동하는 방식: 애플리케이션이 오프셋 토큰(예: Kafka 파티션 오프셋)이 있는 행을 제출합니다. Snowflake는 데이터가 커밋될 때 토큰을 유지합니다. 복구 시 애플리케이션은 getLatestCommittedOffsetToken 을 호출하여 중단된 위치를 찾은 후 해당 위치에서 재생합니다. 중복 데이터가 수집되지 않으며 데이터가 손실되지 않습니다.

*오프셋 토큰*은 클라이언트가 채널별로 수집 진행 상황을 추적하기 위해 행 제출 요청에 포함하는 문자열입니다. 사용되는 구체적인 방법은 REST API의 SDK 및 Append Rows 에 대한 appendRow 또는 appendRows 입니다.

토큰은 채널 생성 시 NULL로 초기화되며 제공된 오프셋 토큰이 있는 행이 Snowflake에 커밋될 때 업데이트됩니다. 클라이언트는 주기적으로 getLatestCommittedOffsetToken 을 호출하여 채널에 대해 커밋된 최신 오프셋 토큰을 가져오고 이를 사용하여 수집 진행 상황을 추론할 수 있습니다.

클라이언트가 채널을 다시 열면 최신 지속형 오프셋 토큰이 반환됩니다. 클라이언트는 동일한 데이터를 두 번 전송하지 않도록 토큰을 사용하여 데이터 원본에서 위치를 재설정할 수 있습니다. 채널 다시 열기 이벤트가 발생할 때 이를 커밋하지 않도록 Snowflake에서 버퍼링된 커밋되지 않은 데이터가 전부 삭제됩니다.

가장 최근에 커밋된 오프셋 토큰을 사용하여 다음을 수행할 수 있습니다.

  • 수집 진행 상황 추적

  • 가장 최근에 커밋된 오프셋 토큰과 비교하여 특정 오프셋이 커밋되었는지 확인

  • 소스 오프셋 진행 및 이미 커밋된 데이터 제거

  • 중복 제거를 활성화하고 데이터가 정확히 한 번 전달되도록 보장

예: Kafka 커넥터 충돌 복구

Kafka 커넥터는 <partition>:<offset> 과 같은 항목에서 오프셋 토큰을 읽습니다. 다음 시나리오를 생각해보십시오.

  1. Kafka 커넥터가 온라인 상태가 되어 채널 이름이 T:P1 인 Kafka 주제 TPartition 1 에 해당하는 채널을 엽니다.

  2. 커넥터가 Kafka 파티션에서 레코드를 읽기 시작합니다.

  3. 커넥터는 API를 호출하여 레코드와 연결된 오프셋을 오프셋 토큰으로 사용하여 appendRows 메서드 요청을 만듭니다.

    예를 들어 오프셋 토큰은 Kafka 파티션의 10번째 레코드를 참조하는 10 일 수 있습니다.

  4. 커넥터는 수집 진행률을 확인하기 위해 주기적으로 getLatestCommittedOffsetToken 메서드 요청을 만듭니다.

Kafka 커넥터가 충돌하는 경우 다음 프로시저는 올바른 오프셋에서 레코드 읽기를 재개합니다.

  1. Kafka 커넥터가 다시 온라인 상태가 되어 이전과 같은 이름을 사용하여 채널을 다시 엽니다.

  2. 커넥터는 getLatestCommittedOffsetToken 을 호출하여 파티션에 대해 가장 최근에 커밋된 오프셋을 가져옵니다.

    예를 들어 지속된 최신 오프셋 토큰이 20 이라고 가정합니다.

  3. 커넥터는 Kafka 읽기 API를 사용하여 오프셋에 1을 더한 값에 해당하는 커서를 재설정하는데, 이 예에서는 21 입니다.

  4. 커넥터가 레코드 읽기를 재개합니다. 읽기 커서의 위치가 올바로 변경된 후에는 중복 데이터가 검색되지 않습니다.

예: 충돌 복구를 사용한 로그 파일 수집

애플리케이션이 디렉터리에서 로그를 읽고 Snowpipe Streaming SDK를 사용하여 해당 로그를 Snowflake로 내보냅니다. 애플리케이션은 다음을 수행합니다.

  1. 로그 디렉터리의 파일을 나열합니다.

    로깅 프레임워크가 사전순으로 정렬할 수 있는 로그 파일을 생성하고 새 로그 파일이 이 정렬의 끝에 배치된다고 가정합니다.

  2. 로그 파일을 한 줄씩 읽고 API를 호출하여 로그 파일 이름과 줄 수 또는 바이트 위치에 해당하는 오프셋 토큰으로 appendRows 메서드 요청을 만듭니다.

    예를 들어 오프셋 토큰은 messages_1.log:20 일 수 있는데, 여기서 messages_1.log 는 로그 파일의 이름이고 20 은 줄 번호입니다.

애플리케이션이 충돌하거나 다시 시작해야 하는 경우 getLatestCommittedOffsetToken 을 호출하여 마지막으로 내보낸 로그 파일과 줄에 해당하는 오프셋 토큰을 검색합니다. 예제를 계속 진행하면 오프셋 토큰이 messages_1.log:20 일 수 있습니다. 그런 다음 애플리케이션은 messages_1.log 를 열고 줄 21 을 검색하여 동일한 로그 줄이 두 번 수집되지 않도록 합니다.

참고

오프셋 토큰 정보가 손실될 수 있습니다. 오프셋 토큰은 채널 오브젝트에 연결되며 30일 동안 채널을 사용하여 새로운 수집이 수행되지 않을 경우에는 채널이 자동으로 지워집니다. 오프셋 토큰을 잃어버리지 않도록 별개의 오프셋을 유지 관리하고 필요한 경우 채널의 오프셋 토큰을 재설정하십시오.

offsetToken 및 :code:`continuationToken`의 역할

offsetToken 및 :code:`continuationToken`은 모두 정확히 한 번의 데이터 전달을 보장하기 위해 사용되지만, 목적이 다르며 서로 다른 하위 시스템에서 관리됩니다. 주요 차이점은 토큰 값을 제어하는 주체와 사용 범위입니다.

  • continuationToken (직접 REST API 사용자에 의해서만 사용됨):

    이 토큰은 Snowflake에서 관리하며 단일 연속 스트리밍 세션의 상태를 유지하는 데 필수적입니다. 클라이언트가 를 사용하여 데이터를 보낼 때, Snowflake는 을 반환합니다. 클라이언트는 데이터가 Snowflake에서 올바른 순서로 간격 없이 수신되도록 다음 요청에서 이 토큰을 다시 반환해야 합니다. 클라이언트가 Append Rows API를 사용하여 데이터를 보낼 때, Snowflake는 :code:`continuationToken`을 반환합니다. 클라이언트는 데이터가 Snowflake에서 올바른 순서로 간격 없이 수신되도록 다음 AppendRows 요청에서 이 토큰을 다시 전달해야 합니다. Snowflake는 이 토큰을 사용하여 SDK 재시도 이벤트 발생 시 중복 데이터나 누락된 데이터를 감지하고 방지합니다.

  • offsetToken:

    이 토큰은 외부 소스에서 정확히 한 번만 전달할 수 있는 사용자 정의 식별자입니다. Snowflake는 이 값을 저장하지만 자체 내부 작업이나 재수집을 방지하기 위해 이 값을 사용하지 않습니다. 외부 시스템(예: Kafka 커넥터)은 Snowflake에서 오프셋 토큰을 읽고 이를 사용하여 자체 수집 진행 상황을 추적하고 외부 스트림을 재생해야 하는 경우 중복 데이터 전송을 방지할 책임이 있습니다.