고성능 아키텍처를 갖춘 Snowpipe Streaming 모범 사례:

이 가이드에서는 고성능 아키텍처를 갖춘 Snowpipe Streaming을 사용하여 강력한 데이터 수집 파이프라인을 설계하고 구현하기 위한 주요 모범 사례를 간략하게 설명합니다. 이러한 모범 사례를 따르면 파이프라인의 내구성, 신뢰성, 오류 처리 효율성을 보장할 수 있습니다.

전략적으로 채널 관리

성능과 장기적인 안정성을 위해 다음 채널 관리 전략을 적용합니다.

  • 장기 채널 사용: 오버헤드를 최소화하려면 채널을 한 번 연 다음, 수집 작업 기간 동안 활성 상태로 유지합니다. 채널을 반복적으로 열고 닫지 마세요.

  • 결정적 채널 이름 사용: 문제 해결을 간소화하고 자동화된 복구 프로세스를 촉진하기 위해 일관되고 예측 가능한 명명 규칙을 적용합니다(예: source-env-region-client-id).

  • ** 여러 채널로 확장**: 처리량을 늘리려면 여러 채널을 엽니다. 이러한 채널은 서비스 제한과 처리량 요구 사항에 따라 단일 대상 파이프 또는 여러 파이프를 가리킬 수 있습니다.

  • 채널 상태 모니터링: 주기적으로 getChannelStatus 메서드를 사용하여 수집 채널의 상태를 모니터링합니다.

    • ``last_committed_offset_token``을 추적하여 데이터가 성공적으로 수집되고 파이프라인이 진행 중인지 확인합니다.

    • ``row_error_count``를 모니터링하여 불량 레코드 또는 기타 수집 문제를 조기에 감지합니다.

일관되게 스키마 유효성 검사

수집 실패를 방지하고 데이터 무결성을 유지하기 위해 수신 데이터가 예상되는 테이블 스키마를 준수하는지 확인합니다.

  • 클라이언트 측 유효성 검사: 클라이언트 측에서 스키마 유효성 검사를 구현하여 즉각적인 피드백을 제공하고 서버 측 오류를 줄입니다. 전체 행별 유효성 검사가 최대한의 안전성을 제공하지만, 더 나은 성능을 위해서는 배치 경계 또는 행 샘플링 등을 활용하여 선택적 유효성 검사를 수행하는 것이 좋습니다.

  • 서버 측 유효성 검사: 고성능 아키텍처는 스키마 유효성 검사를 서버로 오프로드할 수 있습니다. 대상 파이프와 테이블로 수집하는 동안 스키마 불일치가 발생하는 경우 오류 및 해당 개수는 :code:`getChannelStatus`를 통해 보고됩니다.

클라이언트 측 메타데이터 열 추가

강력한 오류 감지 및 복구를 활성화하려면 수집 메타데이터를 행 페이로드의 일부로 전달해야 합니다. 이를 위해서는 데이터 형태 및 PIPE 정의를 사전에 계획합니다.

수집하기 전에 행 페이로드에 다음 열을 추가합니다.

  • :code:`CHANNEL_ID`(예: 압축된 INTEGER)

  • :code:`STREAM_OFFSET`(채널마다 단조적으로 증가하는 Kafka 파티션 오프셋과 같은 BIGINT)

이러한 열을 통해 채널당 레코드를 고유하게 식별하고 데이터의 원본을 추적할 수 있습니다.

선택 사항: 여러 파이프가 동일한 대상 테이블로 수집되는 경우 PIPE_ID 열을 추가합니다. 이 열을 사용하면 수집 파이프라인까지 행을 다시 쉽게 추적할 수 있습니다. 설명이 포함된 파이프 이름을 별도의 조회 테이블에 저장하고 컴팩트 정수로 매핑하여 저장소 비용을 줄일 수 있습니다.

메타데이터 오프셋을 사용하여 오류 감지 및 복구

채널 모니터링을 메타데이터 열과 결합하여 문제를 감지하고 복구합니다.

  • 모니터 상태: :code:`getChannelStatus`를 정기적으로 확인합니다. :code:`row_error_count`가 증가하면 잠재적인 문제가 있을 가능성이 높습니다.

  • 누락된 레코드 감지: 오류가 감지되면 SQL 쿼리를 통해 STREAM_OFFSET 시퀀스에서 공백을 확인하여 누락되거나 순서가 잘못된 레코드를 식별합니다.

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

REST API 요청에 압축 사용

Snowpipe Streaming REST API를 사용하는 경우 압축을 통해 요청당 더 많은 데이터를 전송하고 네트워크 오버헤드를 줄입니다.

REST API의 요청당 물리적 제한은 4MB이지만, 이러한 제한은 관찰된 전송 크기에 적용됩니다. 압축을 사용하면 각 요청에 더 큰 비압축 데이터 볼륨을 맞출 수 있으므로 처리량을 높이고 API 호출 수를 줄일 수 있습니다.

Snowflake는 Gzip도 지원되지만, 고성능 압축 알고리즘으로 ZSTD를 사용하는 것을 권장합니다.

MATCH_BY_COLUMN_NAME을 사용한 수집 성능 및 비용 최적화

모든 데이터를 단일 VARIANT 열로 수집하는 대신, 소스 데이터에서 필요한 열을 매핑하도록 파이프를 구성합니다. 이렇게 하려면 :code:`MATCH_BY_COLUMN_NAME = CASE_SENSITIVE`를 사용하거나 파이프 정의에 변환을 적용합니다. 이 모범 사례는 수집 비용을 최적화할 뿐만 아니라 스트리밍 데이터 파이프라인의 전반적인 성능도 향상시킵니다.

이 모범 사례에는 다음과 같은 중요한 이점이 있습니다.

  • :code:`MATCH_BY_COLUMN_NAME = CASE_SENSITIVE`를 사용하여 대상 테이블에 수집된 데이터 값에 대해서만 요금이 청구됩니다. 반대로 데이터를 단일 VARIANT 열로 수집하면 키와 값을 모두 포함한 모든 JSON 바이트에 대해 요금을 청구합니다. 자세하거나 많은 JSON 키를 사용하는 데이터의 경우 이 경우 수집 비용이 불필요하게 크게 증가할 수 있습니다.

  • Snowflake의 처리 엔진은 계산 효율성이 더 높습니다. 전체 JSON 오브젝트를 VARIANT로 구문 분석한 다음, 필요한 열을 추출하는 대신, 이 메서드는 필요한 값을 직접 추출합니다.

반정형 데이터에 네이티브 데이터 타입 사용

최적의 성능과 데이터 무결성을 위해 직렬화된 문자열이 아닌 네이티브 언어 오브젝트를 사용하여 반정형 데이터를 제공합니다.

  • 성능: 네이티브 오브젝트를 통해 SDK는 Snowflake 서버에서 추가적인 구문 분석 단계 없이 데이터를 더 효율적으로 처리할 수 있습니다.

  • 안전한 유형: 고성능 아키텍처는 문자열 리터럴을 리터럴 텍스트로 처리합니다. 네이티브 오브젝트를 사용하면 데이터가 이스케이프된 문자열 값이 아닌 정형 JSON으로 저장됩니다.

Java 예제:

// Preferred: SDK converts the List to a structured ARRAY
row.put("tags", Arrays.asList("electronics", "sale"));
Copy

Python 예제:

# Preferred: SDK converts the dict to a structured VARIANT
row["payload"] = {"event_id": 101, "status": "active"}
Copy

Prometheus 메트릭 가져오기

Snowpipe Streaming 고성능 클라이언트에서 성능 메트릭을 가져오려면 기본 제공 Prometheus 메트릭 서버를 활성화하고 엔드포인트를 스크래핑하도록 Prometheus 서비스를 구성해야 합니다.

애플리케이션을 실행하기 전에 환경 변수 :code:`SS_ENABLE_METRICS`를 :code:`true`로 설정하여 메트릭 서버를 활성화합니다.

Snowpipe Streaming 수집 프로세스를 실행 중인 호스트에서 메트릭 엔드포인트를 스크래핑합니다. 기본 경로는 SS_METRICS_IPSS_METRICS_PORT`에 의해 정의된 호스트와 포트의 :code:/metrics`입니다.

예: 메트릭 엔드포인트 확인하기(로컬 프로세스/개발 상자)

# Enable Prometheus metrics
export SS_ENABLE_METRICS=true
# Run your application (the server starts on 127.0.0.1:50000 by default)

# Curl the endpoint to verify the metrics are exposed
curl http://127.0.0.1:50000/metrics
Copy

예: Prometheus 스크래핑 구성

Snowpipe Streaming 클라이언트를 실행하는 호스트에서 Prometheus 서비스를 가리킵니다.

scrape_configs:
  - job_name: snowpipe_streaming_hp
    metrics_path: /metrics   # default is /metrics
    static_configs:
      - targets: ['127.0.0.1:50000']
Copy

복원력을 위한 설계

try-catch 블록에서 수집 래핑

:code:`insertRows`가 항상 성공한다고 가정하지 마세요. 수집 루프가 :code:`SFException`을 캐치하고 HTTP 상태 코드(특히 무효화의 경우 409 및 제한의 경우 429)를 해석할 수 있는지 확인합니다.

지수 백오프 구현

재시도 가능한 오류(429, 500, 503)의 경우 즉시 재시도하지 마세요. 지수 백오프 전략(각 재시도 사이의 대기 시간 증가)을 사용하여 시스템이 복구할 수 있도록 합니다.

오프셋 토큰으로 진행 상황 확인

주기적으로 :code:`getLatestCommittedOffsetToken`을 호출하여 성공적으로 지속된 데이터를 추적합니다. 409 오류가 발생하면 이 토큰을 사용하여 채널을 다시 연 후 데이터를 재생할 정확한 지점을 식별합니다.

채널 상태 모니터링

getChannelStatus()`를 정기적으로 확인합니다. 상태 코드가 :code:`SUCCESS 이외의 코드인 경우 오류 처리 논리를 트리거하여 채널 또는 클라이언트 연결을 재설정합니다.