Snowpipe Streaming

Snowpipe Streaming API(《API》)를 호출하면 Snowflake Ingest SDK와 사용자가 자체적으로 관리하는 애플리케이션 코드를 사용하여 짧은 지연 시간으로 스트리밍 데이터 행을 로딩할 수 있습니다. 스테이징된 파일에서 데이터를 쓰는 Snowpipe 또는 대량 데이터 로드와는 달리, 스트리밍 수집 API는 Snowflake 테이블에 데이터 행을 씁니다. 이 아키텍처에서는 비슷한 양의 데이터를 로딩하는 데 드는 비용이 낮아지는 것과 함께, 로드 지연 시간이 짧아지므로, 실시간 데이터 스트림을 처리하는 강력한 도구가 됩니다.

이 항목에서는 API를 호출하는 사용자 지정 클라이언트 애플리케이션의 개념을 설명합니다. 관련 Kafka용 Snowflake Connector(《Kafka 커넥터》) 지침은 Snowpipe Streaming과 함께 Kafka용 Snowflake Connector 사용하기 섹션을 참조하십시오.

이 항목의 내용:

Snowpipe Streaming API 대 Snowpipe

API는 Snowpipe를 대체하는 것이 아니라 보완하기 위한 것입니다. 데이터가 파일에 기록되지 않고 행(예: Apache Kafka 주제)을 통해 스트리밍되는 스트리밍 시나리오에서는 Snowpipe Streaming API를 사용하십시오. 이 API는 레코드를 생성하거나 수신하는 기존 사용자 지정 Java 애플리케이션을 포함하는 수집 워크플로에 적합합니다. 이 API를 사용하면 Snowflake 테이블에 데이터를 로드하려고 파일을 생성할 필요가 없으며 데이터를 사용할 수 있게 될 때 데이터 스트림을 Snowflake에 자동으로 연속 로드할 수 있습니다.

Snowpipe Streaming

다음 표에 Snowpipe Streaming과 Snowpipe의 차이점이 설명되어 있습니다.

카테고리

Snowpipe Streaming

Snowpipe

로드할 데이터의 형식

파일. 기존 데이터 파이프라인이 Blob 저장소에서 파일을 생성하는 경우 API 대신 Snowpipe를 사용하는 것이 좋습니다.

타사 소프트웨어 요구 사항

Snowflake Ingest SDK용 사용자 지정 Java 애플리케이션 코드 래퍼

없음

데이터 순서 지정

각 채널 내에서 순서가 지정된 삽입

지원 안 됨. Snowpipe는 클라우드 저장소의 파일 생성 타임스탬프와 다른 순서로 파일에서 데이터를 로드할 수 있습니다.

로드 내역

SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY 뷰 (Account Usage)에 기록된 로드 내역

LOAD_HISTORY 뷰 (Account Usage) 및 COPY_HISTORY 함수 (Information Schema)에 기록된 로드 내역.

파이프 오브젝트

파이프 오브젝트가 필요하지 않습니다. 이 API는 대상 테이블에 직접 레코드를 씁니다.

큐에 넣은 스테이징된 파일 데이터를 대상 테이블로 로드하는 파이프 오브젝트가 필요합니다.

소프트웨어 요구 사항

Java SDK

Snowpipe Streaming 서비스는 현재 Snowflake Ingest SDK에 대한 일련의 API로 구현됩니다. SDK는 Maven Central Repository 에서 다운로드할 수 있습니다. Snowflake Ingest SDK 버전 2.0.2 이상을 사용하는 것이 좋습니다.

SDK는 Java 버전 8 이상을 지원하며 JCE(Java Cryptography Extension) Unlimited Strength Jurisdiction Policy File 이 필요합니다.

중요

SDK는 Snowflake와 상호 작용하기 위해 REST API 호출을 수행합니다. 연결을 허용하려면 네트워크 방화벽 규칙을 조정해야 할 수 있습니다.

사용자 지정 클라이언트 애플리케이션

이 API에는 데이터 행을 펌핑하고 발생한 오류를 처리할 수 있는 사용자 지정 Java 애플리케이션 인터페이스가 필요합니다. 애플리케이션이 지속적으로 실행되고 오류로부터 복구할 수 있도록 보장할 책임은 사용자 자신에게 있습니다. 주어진 행 배치의 경우 API는 ON_ERROR = CONTINUE | SKIP_BATCH | ABORT 에 해당하는 기능을 지원합니다.

  • CONTINUE: 허용 가능한 데이터 행을 계속 로드하고 모든 오류를 반환합니다.

  • SKIP_BATCH: 전체 행 배치에서 오류가 발생하면 로딩을 건너뛰고 모든 오류를 반환합니다.

  • ABORT (기본 설정): 전체 행 배치를 중단하고 첫 번째 오류가 발생하면 예외를 발생시킵니다.

애플리케이션은 insertRow (단일 행) 또는 insertRows (행 세트) 메서드의 응답을 사용하여 오류를 캡처해야 합니다.

채널

API는 하나 이상의 채널 을 통해 행을 수집합니다. 채널은 테이블에 데이터를 로드하기 위한 Snowflake에 대한 논리적이고 명명된 스트리밍 연결을 나타냅니다. 단일 채널이 Snowflake에서 정확히 한 테이블에 매핑되지만, 여러 채널이 같은 테이블을 가리킬 수 있습니다. Client SDK는 여러 테이블에 대해 여러 채널을 열 수 있지만, 이 SDK는 여러 계정에서 채널을 열 수는 없습니다. 행의 순서와 해당 오프셋 토큰은 채널 내에서 유지되지만, 동일한 테이블을 가리키는 채널 간에는 유지되지 않습니다.

채널은 클라이언트가 데이터를 능동적으로 삽입할 때 오랫동안 유지되어야 하며 오프셋 토큰 정보가 유지되므로 재사용되어야 합니다. 채널 내부의 데이터는 기본적으로 1초마다 자동으로 플러시되므로 닫을 필요가 없습니다. 자세한 내용은 대기 시간 섹션을 참조하십시오.

Snowpipe streaming client channel table mapping

SHOW CHANNELS 명령을 실행하여 액세스 권한이 있는 채널을 나열할 수 있습니다. 자세한 내용은 SHOW CHANNELS 섹션을 참조하십시오.

참고

오프셋 토큰과 함께 비활성 채널은 30일 후에 자동으로 삭제됩니다.

오프셋 토큰

오프셋 토큰 은 채널별로 수집 진행 상황을 추적하기 위해 클라이언트가 insertRow (단일 행) 또는 insertRows (행 세트) 메서드 요청에 포함할 수 있는 문자열입니다. 토큰은 채널 생성 시 NULL로 초기화되며 제공된 오프셋 토큰이 있는 행이 비동기 프로세스를 통해 Snowflake에 커밋될 때 업데이트됩니다. 클라이언트는 주기적으로 getLatestCommittedOffsetToken 메서드 요청을 만들어 특정 채널에 대한 커밋된 최신 오프셋 토큰을 가져오고 이를 사용하여 수집 진행 상황을 추론할 수 있습니다. 이 토큰은 Snowflake에서 중복 제거를 수행하는 데 사용되지 않지만, 클라이언트는 이 토큰을 사용하여 사용자 지정 코드로 중복 제거를 수행할 수 있습니다.

클라이언트가 채널을 다시 열면 최신 지속형 오프셋 토큰이 반환됩니다. 클라이언트는 동일한 데이터를 두 번 전송하지 않도록 토큰을 사용하여 데이터 원본에서 위치를 재설정할 수 있습니다. 채널 다시 열기 이벤트가 발생할 때 이를 커밋하지 않도록 Snowflake에 버퍼링된 데이터가 전부 삭제됩니다.

가장 최근에 커밋된 오프셋 토큰을 사용하여 다음과 같은 일반적인 사용 사례를 수행할 수 있습니다.

  • 수집 진행 상황 추적

  • 가장 최근에 커밋된 오프셋 토큰과 비교하여 특정 오프셋이 커밋되었는지 확인

  • 원본 오프셋 진행 및 이미 커밋된 데이터 제거

  • 중복 제거 활성화 및 정확히 한 번만 데이터 전달

예를 들어 Kafka 커넥터는 파티션이 채널 이름으로 인코딩된 경우 <partition>:<offset> 또는 단순히 <offset> 과 같은 주제에서 오프셋 토큰을 읽을 수 있습니다. 다음 시나리오를 생각해보십시오.

  1. Kafka 커넥터가 온라인 상태가 되어 채널 이름이 T:P1 인 Kafka 주제 TPartition 1 에 해당하는 채널을 엽니다.

  2. 커넥터가 Kafka 파티션에서 레코드를 읽기 시작합니다.

  3. 커넥터는 API를 호출하여 레코드와 연결된 오프셋을 오프셋 토큰으로 사용하여 insertRows 메서드 요청을 만듭니다.

    예를 들어 오프셋 토큰은 Kafka 파티션의 10번째 레코드를 참조하는 10 일 수 있습니다.

  4. 커넥터는 수집 진행률을 확인하기 위해 주기적으로 getLatestCommittedOffsetToken 메서드 요청을 만듭니다.

Kafka 커넥터의 작동이 중단되는 경우 다음 절차를 완료하여 Kafka 파티션의 올바른 오프셋에서 레코드 읽기를 재개할 수 있습니다.

  1. Kafka 커넥터가 다시 온라인 상태가 되어 이전과 같은 이름을 사용하여 채널을 다시 엽니다.

  2. 커넥터가 API를 호출해 getLatestCommittedOffsetToken 메서드 요청을 만들어 파티션에 대해 커밋된 최신 오프셋을 가져옵니다.

    예를 들어 지속된 최신 오프셋 토큰이 20 이라고 가정합니다.

  3. 커넥터는 Kafka 읽기 API를 사용하여 오프셋에 1을 더한 값에 해당하는 커서를 재설정하는데, 이 예에서는 21 입니다.

  4. 커넥터가 레코드 읽기를 재개합니다. 읽기 커서의 위치가 올바로 변경된 후에는 중복 데이터가 검색되지 않습니다.

또 다른 예에서는 애플리케이션이 디렉터리에서 로그를 읽고 Snowpipe Streaming Client SDK를 사용하여 해당 로그를 Snowflake로 내보냅니다. 다음을 수행하는 로그 내보내기 애플리케이션을 만들 수 있습니다.

  1. 로그 디렉터리의 파일을 나열합니다.

    로깅 프레임워크가 사전순으로 정렬할 수 있는 로그 파일을 생성하고 새 로그 파일이 이 정렬의 끝에 배치된다고 가정합니다.

  2. 로그 파일을 한 줄씩 읽고 API를 호출하여 로그 파일 이름과 줄 수 또는 바이트 위치에 해당하는 오프셋 토큰으로 insertRows 메서드 요청을 만듭니다.

    예를 들어 오프셋 토큰은 messages_1.log:20 일 수 있는데, 여기서 messages_1.log 는 로그 파일의 이름이고 20 은 줄 번호입니다.

애플리케이션은 비정상 종료되거나 다시 시작해야 하는 경우 API를 호출하여 마지막으로 내보낸 로그 파일과 줄에 해당하는 오프셋 토큰을 불러오는 getLatestCommittedOffsetToken 메서드 요청을 만듭니다. 예제를 계속 진행하면 오프셋 토큰이 messages_1.log:20 일 수 있습니다. 그런 다음 애플리케이션은 messages_1.log 를 열고 21 행을 검색하여 같은 로그 행이 두 번 수집되지 않도록 합니다.

참고

오프셋 토큰 정보가 손실될 수 있습니다. 오프셋 토큰은 채널 오브젝트에 연결되며 30일 동안 채널을 사용하여 새로운 수집이 수행되지 않을 경우에는 채널이 자동으로 지워집니다. 오프셋 토큰을 잃어버리지 않도록 별개의 오프셋을 유지 관리하고 필요한 경우 채널의 오프셋 토큰을 재설정하십시오.

정확히 한 번 전달하는 모범 사례

정확히 한 번 전달하기란 어려운 일이 될 수 있으며, 사용자 지정 코드에서 다음 원칙을 준수하는 것이 매우 중요합니다.

  • 예외, 실패 또는 비정상 종료 발생 시 적절한 복구를 보장하려면 항상 채널을 다시 열고 가장 최근에 커밋된 오프셋 토큰을 사용하여 다시 수집을 시작해야 합니다.

  • 애플리케이션이 자체 오프셋을 유지할 수 있겠지만, Snowflake에서 제공하는 가장 최근에 커밋된 오프셋 토큰을 정보 소스로 사용하고 그에 따라 적절히 자체 오프셋을 재설정하는 것이 매우 중요합니다.

  • 자체 오프셋을 정보 소스로 취급해야 하는 유일한 경우는 Snowflake의 오프셋 토큰이 NULL로 설정되거나 재설정될 때입니다. NULL 오프셋 토큰은 보통 다음 중 하나를 의미합니다.

    • 이것은 새 채널이므로 설정된 오프셋 토큰이 없습니다.

    • 대상 테이블이 삭제되고 다시 생성되었으므로 채널이 새 채널로 간주됩니다.

    • 30일 동안 채널을 통한 수집 활동이 없어 채널이 자동으로 정리되었으며 오프셋 토큰 정보가 손실되었습니다.

  • 필요한 경우 가장 최근에 커밋된 오프셋 토큰을 기반으로 이미 커밋된 원본 데이터를 주기적으로 제거하고 자체 오프셋을 진행할 수 있습니다.

Snowpipe Streaming을 사용하는 Kafka 커넥터가 정확히 한 번 전달하는 자세한 방법은 정확히 한 번 의미 체계 섹션을 참조하십시오.

대기 시간

Snowpipe Streaming은 1초마다 채널 내의 데이터를 자동으로 플러시합니다. 데이터를 플러시하려고 채널을 닫을 필요는 없습니다.

Snowflake Ingest SDK 버전 2.0.4 이상에서는 max_client_lag 옵션을 사용하여 대기 시간을 구성할 수 있습니다. 기본 옵션은 1초입니다. 최대 대기 시간은 최대 10분까지 설정할 수 있습니다. 자세한 내용은 MAX_CLIENT_LAG 섹션을 참조하십시오.

Snowpipe Streaming용 Kafka 커넥터에는 자체 버퍼가 있습니다. Kafka 버퍼 플러시 시간에 도달하면 Snowpipe Streaming을 통해 1초의 대기 시간으로 데이터가 Snowflake로 전송됩니다. 자세한 내용은 buffer.flush.time 을 참조하십시오.

최적화된 파일로 마이그레이션

API는 채널의 행을 클라우드 저장소의 Blob에 쓴 다음 대상 테이블에 커밋합니다. 처음에 대상 테이블에 쓴 스트리밍 데이터는 임시 중간 파일 형식으로 저장됩니다. 이 스테이지에서 테이블은 《혼합 테이블》로 간주되는데, 분할된 데이터가 기본 파일과 중간 파일이 혼합된 형식으로 저장되기 때문입니다. 자동화된 백그라운드 프로세스에서는 필요에 따라 활성 중간 파일에서 쿼리 및 DML 작업에 최적화된 기본 파일로 데이터를 마이그레이션합니다.

복제

Snowpipe Streaming는 Snowpipe Streaming과 이와 연결된 채널 오프셋으로 채워진 Snowflake 테이블의 복제와 장애 조치 를 지원하는데, 원본 계정에서 다른 리전 에 있는 대상 계정으로, 그리고 여러 클라우드 플랫폼 에 걸쳐 지원합니다.

자세한 내용은 복제 및 Snowpipe Streaming 섹션을 참조하십시오.

Insert-only 작업

API 는 현재 행 삽입으로 제한됩니다. 데이터를 수정, 삭제 또는 결합하려면 하나 이상의 스테이징 테이블에 《원시》 레코드를 쓰십시오. 연속 데이터 파이프라인 을 사용하여 데이터를 병합, 조인 또는 변환하여 수정된 데이터를 대상 보고 테이블에 삽입합니다.

클래스 및 인터페이스

클래스 및 인터페이스에 대한 설명서는 Snowflake Ingest SDK API 를 참조하십시오.

지원되는 Java 데이터 타입

다음 표에는 Snowflake 열로 수집하기 위해 지원되는 Java 데이터 타입이 요약되어 있습니다.

Snowflake 열 유형

허용되는 Java 데이터 타입

  • CHAR

  • VARCHAR

  • 문자열

  • 기본 데이터 타입(int, boolean, char, …)

  • BigInteger, BigDecimal

  • BINARY

  • byte[]

  • 문자열(16진수 인코딩)

  • NUMBER

  • 숫자 타입(BigInteger, BigDecimal, byte, int, double, …)

  • 문자열

  • FLOAT

  • 숫자 타입(BigInteger, BigDecimal, byte, int, double, …)

  • 문자열

  • BOOLEAN

  • 부울

  • 숫자 타입(BigInteger, BigDecimal, byte, int, double, …)

  • 문자열

부울 변환 세부 정보 를 참조하십시오.

  • TIME

  • java.time.LocalTime

  • java.time.OffsetTime

  • 문자열

    • 정수 저장 시간

    • HH24:MI:SS.FFTZH:TZM (예: 20:57:01.123456789+07:00)

    • HH24:MI:SS.FF (예: 20:57:01.123456789)

    • HH24:MI:SS (예: 20:57:01)

    • HH24:MI (예: 20:57)

  • DATE

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 문자열

    • 정수 저장 날짜

    • YYYY-MM-DD (예: 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (예: 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (예: 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (예: 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (예: 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (예: 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (예: 2013-04-28T20:57-07:00)

  • TIMESTAMP_NTZ

  • TIMESTAMP_LTZ

  • TIMESTAMP_TZ

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.time.Instant

  • 문자열

    • 정수 저장 타임스탬프

    • YYYY-MM-DD (예: 2013-04-28)

    • YYYY-MM-DDTHH24:MI:SS.FFTZH:TZM (예: 2013-04-28T20:57:01.123456789+07:00)

    • YYYY-MM-DDTHH24:MI:SS.FF (예: 2013-04-28T20:57:01.123456)

    • YYYY-MM-DDTHH24:MI:SS (예: 2013-04-28T20:57:01)

    • YYYY-MM-DDTHH24:MI (예: 2013-04-28T20:57)

    • YYYY-MM-DDTHH24:MI:SSTZH:TZM (예: 2013-04-28T20:57:01-07:00)

    • YYYY-MM-DDTHH24:MITZH:TZM (예: 2013-04-28T20:57-07:00)

  • VARIANT

  • ARRAY

  • 문자열(유효한 JSON이어야 함)

  • 기본 데이터 타입 및 해당 배열

  • BigInteger, BigDecimal

  • java.time.LocalTime

  • java.time.OffsetTime

  • java.time.LocalDate

  • java.time.LocalDateTime

  • java.time.OffsetDateTime

  • java.time.ZonedDateTime

  • java.util.Map<String, T>. 여기서 T는 유효한 VARIANT 타입임

  • T[]. 여기서 T는 유효한 VARIANT 타입임

  • List<T>. 여기서 T는 유효한 VARIANT 타입임

  • OBJECT

  • 문자열(유효한 JSON 오브젝트여야 함)

  • Map<String, T>. 여기서 T는 유효한 베리언트 타입임

  • GEOGRAPHY

  • 지원되지 않음

  • GEOMETRY

  • 지원되지 않음

필수 액세스 권한

Snowpipe Streaming API를 호출하려면 다음 권한이 있는 역할이 필요합니다.

오브젝트

권한

테이블

OWNERSHIP 또는 최소 INSERT 및 EVOLVE SCHEMA(Snowpipe Streaming과 함께 Kafka 커넥터에 대한 스키마 진화를 사용할 때만 필요함)

데이터베이스

USAGE

스키마

USAGE

제한 사항

Snowpipe Streaming은 데이터 암호화에 256비트 AES 키 사용만 지원합니다.

다음 오브젝트 또는 유형은 지원되지 않습니다.

  • GEOGRAPHY 및 GEOMETRY 데이터 타입

  • 데이터 정렬이 포함된 열

  • TRANSIENT 또는 TEMPORARY 테이블

  • 다음 열 설정이 포함된 테이블:

    • AUTOINCREMENT 또는 IDENTITY

    • NULL이 아닌 기본 열 값