Snowpipe Streaming 마이그레이션 가이드¶
이 가이드에서는 클래식 Snowpipe Java SDK에서 고성능 Snowpipe Streaming SDK로 마이그레이션하는 방법을 설명합니다. 고성능 아키텍처가 두 언어로 모두 제공되므로, 여기서 설명하는 아키텍처 변경 사항 및 API 업데이트는 Python SDK로의 마이그레이션에도 적용됩니다. 이 문서의 코드 예제는 Java로 되어 있지만, 핵심 마이그레이션 원칙은 여러 언어에서 일관되게 유지됩니다.
주요 아키텍처 변경 사항¶
다음 표에는 고성능 Snowpipe Streaming SDK에서 가장 중요한 아키텍처 변경 사항이 요약되어 있습니다. SDKs를 자세히 비교하려면 고성능 Snowpipe Streaming과 클래식 SDKs의 비교 섹션을 참조하세요.
리전 |
클래식(snowflake-ingest-java) |
고성능(snowpipe-streaming SDK) |
|---|---|---|
진입점 |
데이터는 테이블에 직접 수집됩니다. |
데이터는 변환 및 스키마 적용을 지원하는 PIPE 오브젝트를 통해 수집됩니다. |
SDK 및 코어 |
Java SDK만 지원합니다. |
공유된 Rust 코어를 사용하여 여러 언어(Java 및 Python)의 SDK를 지원합니다. |
API 이름 |
|
|
오류 처리 |
클라이언트 측 유효성 검사가 수행됩니다. |
더 풍부한 오류 피드백과 함께 서버 측 유효성 검사가 제공됩니다. |
역압 처리 |
스레드를 절전 모드로 전환하여 ‘차단’됨 또는 ‘응답 없음’ 상태로 만듭니다. |
호출자가 백오프 및 재시도 전략을 구현할 수 있도록 오류를 반환합니다. |
클라이언트-테이블 매핑 |
단일 클라이언트 오브젝트는 모든 테이블에 대한 채널을 열 수 있습니다. |
이제 단일 클라이언트 오브젝트가 1개의 파이프 오브젝트에만 독점적으로 연결됩니다. |
청구 |
컴퓨팅 및 클라이언트 수를 기준으로 합니다. |
플랫, 수집된 GB당 |
스키마 및 변환 |
클라이언트 측에서 관리됩니다. |
PIPE 정의를 통해 서버 측에서 관리됩니다. |
마이그레이션 프로세스¶
애플리케이션을 고성능 SDK로 마이그레이션하려면 다음 개략적인 단계를 수행합니다.
각 대상 테이블에 대해 PIPE를 만듭니다.
CREATE PIPE my_pipe AS COPY INTO my_table FROM TABLE (DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE [CLUSTER_AT_INGEST_TIME = TRUE];
모든 클래식 클라이언트에서 수집을 중지합니다.
클래식 클라이언트의 각 채널에 대해 마지막으로 커밋된 오프셋을 확인합니다. 이러한 오프셋을 검색하려면 클래식 SDK의
getLatestCommittedOffsetTokens()메서드를 사용합니다. 이러한 오프셋이 클라이언트 측 레코드와 일치하는지 확인합니다.애플리케이션 코드를 업데이트합니다.
프로젝트 종속성을 고성능 SDK(Java 또는 Python)로 전환합니다.
다음 API 및 구성 변경 사항 섹션에 자세히 설명된 대로 API 호출을 업데이트합니다.
Snowflake에서 마지막으로 커밋된 오프셋을 사용하여 테이블 또는 PIPE당 하나의 클라이언트를 초기화합니다.
새 클라이언트가 구성되고 안정화되면 수집을 재개합니다.
API 및 구성 변경 사항¶
마이그레이션 중에 API 호출 및 구성 설정을 다음과 같이 변경해야 합니다.
클라이언트 초기화¶
클래식:
builder(name)고성능:
builder(name, db, schema, pipeName)
채널¶
클래식:
openChannel(OpenChannelRequest)고성능: ``openChannel(channelName, offsetToken)``은 채널과 상태를 모두 반환
수집 방법¶
클래식:
insertRow/insertRows(...)고성능:
appendRow/appendRows(...)
오프셋 추적¶
클래식 SDK의
getLatestCommittedOffsetTokens(channels)메서드는 가시성이 제한되고 오류 컨텍스트가 부족합니다.고성능 SDK는 여전히 ``getLatestCommittedOffsetTokens(…)``를 지원하지만, 강력한 모니터링을 위해서는 ``getChannelStatuses(…)``를 사용하는 것이 좋습니다. 이 메서드는 다음 작업을 수행합니다.
오프셋이 예상대로 진행되고 있는지 확인합니다.
채널당 오류 수와 자세한 오류 정보를 반환합니다.
데이터 파이프라인을 사전에 모니터링하고 문제를 해결할 수 있습니다.
반정형 데이터 처리하기¶
고성능 SDK로 마이그레이션할 때, 데이터가 리터럴 문자열로 저장되지 않도록 애플리케이션이 ARRAY 및 VARIANT에 대한 데이터를 제공하는 방법을 검토합니다.
동작 변경¶
직렬화된 문자열 리터럴(예: “[1, 2, 3]”)을 v2의 ARRAY 열에 전달하면 해당 문자열 리터럴을 포함하는 단일 요소 배열이 생성됩니다. 클래식 아키텍처 동작을 유지하려면 다음 옵션 중 하나를 선택합니다.
옵션 1: 네이티브 오브젝트 전달(권장)¶
`appendRow`를 호출하기 전에 클라이언트 애플리케이션을 업데이트하여 JSON 문자열을 네이티브 오브젝트로 역직렬화합니다.
Java: 배열에는 ``java.util.List``를 사용하고 오브젝트에는 ``java.util.Map``을 사용합니다.
Python: 네이티브
list및dict유형을 사용합니다.
이점: 기본 파이프 및 자동 스키마 진화와 호환됩니다.
옵션 2: 파이프 측 변환¶
PARSE_JSON 함수를 사용하여 변환 논리가 있는 Pipe 오브젝트 함수를 명시적으로 정의합니다.
SQL 예제
CREATE PIPE my_pipe AS
COPY INTO my_table (my_array_col)
FROM (SELECT PARSE_JSON($1:my_array_col) FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
참고
이 메서드는 기본 파이프 및 자동 스키마 진화 기능과 호환되지 않습니다.