고성능 아키텍처를 갖춘 Snowpipe Streaming의 오류 로깅

Snowpipe Streaming의 오류 로깅은 Snowflake의 :ref:`DML 오류 로깅 <label-data_load_overview_dml_error_logging>`기능을 기반으로 데이터 수집 오류를 관리하고 복구하는 강력한 방법을 제공합니다. 이 기능은 자동 데이터 손실을 방지하고 결함이 있는 데이터 행에 대한 가시성을 높입니다. 오류 로깅이 켜져 있으면 오류가 없는 데이터는 대상 테이블에 계속 로드되지만, 처리에 실패한 행은 검토 및 복구를 위해 전용 오류 테이블로 자동 라우팅됩니다.

중요

오류 테이블에 저장된 데이터는 파이프 변환이 적용되기 전 API 또는 SDK로 전송된 원래 페이로드입니다. 파이프가 필드를 삭제하거나 변환하더라도 전체 원본 페이로드가 오류 테이블에 유지됩니다.

개요

Snowpipe Streaming 고성능 아키텍처를 사용할 때 데이터 처리는 Snowflake의 서버 측에서 발생합니다. 고성능 아키텍처는 암시적으로 ON_ERROR = CONTINUE 모드에서 작동합니다. 즉, 유효한 행은 수집되고 문제가 있는 행은 건너뜁니다.

오류 처리 옵션

다음과 같은 방법으로 수집 오류를 모니터링하고 처리할 수 있습니다.

오류 테이블 없음:

이러한 메서드는 발생한 오류개수 는 알려주지만 실패한 행 이나 페이로드는 알려주지 않습니다.

오류 테이블 사용:

  • 처리에 실패한 행은 전용 오류 테이블에 자동으로 캡처됩니다.

  • 각 오류 행에는 전체 원본 페이로드와 자세한 오류 메타데이터가 포함됩니다.

  • 표준 SQL을 사용하여 실패한 행을 쿼리, 분석, 재처리할 수 있습니다.

오류 테이블은 실패한 행과 그 이유를 정확히 보여주어 전체적인 상황을 파악하고 전체 디버깅 및 복구를 수행하도록 지원합니다.

오류 로깅 켜기

Snowpipe Streaming의 오류 로깅을 켜려면 대상 테이블에 ERROR_LOGGING 속성을 설정합니다. 오류 로깅 켜기 및 구성에 대한 자세한 내용은 테이블에 대한 DML 오류 로깅 구성 섹션을 참조하세요.

-- For a new table:
CREATE TABLE my_streaming_table (...) ERROR_LOGGING = TRUE;

-- For an existing table:
ALTER TABLE my_streaming_table SET ERROR_LOGGING = TRUE;

오류 로깅이 켜져 있으면 동일한 오류 테이블이 DML 문 및 Snowpipe Streaming 수집 워크로드 모두에서 발생하는 오류를 캡처합니다.

오류 테이블 쿼리

기본 테이블에 대한 오류 테이블을 쿼리하려면 ERROR_TABLE 테이블 함수를 사용합니다. 오류 테이블 스키마, 액세스 제어 및 지원되는 작업에 대한 자세한 내용은 오류 로깅 및 오류 테이블 섹션을 참조하세요.

SELECT * FROM ERROR_TABLE(my_streaming_table) ORDER BY timestamp;

결과에는 수집 스트림에서 오류 행마다 하나의 행이 포함됩니다.

Snowpipe Streaming 오류 필드

Snowpipe Streaming 오류는 DML 오류와 동일한 오류 테이블 열 (timestamp, query_id, error_code, error_metadata, error_data)에 저장됩니다. error_metadataerror_data 오브젝트에는 다음 섹션에서 설명하는 Snowpipe Streaming의 추가 필드가 포함됩니다.

Snowpipe Streaming 오류 식별

Snowpipe Streaming에서 오류가 발생하면 error_metadata:service 필드에 snowpipe_streaming 값이 채워집니다. 이 필드를 사용하여 소스별로 오류를 필터링합니다.

SELECT * FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming';

오류 메타데이터 세부 정보

Snowpipe Streaming 오류의 경우 error_metadata:details 오브젝트에 다음과 같은 추가 필드가 포함됩니다.

필드

설명

pipe_name

오류 행을 수집하는 데 사용되는 파이프의 이름입니다.

channel_name

오류 행을 수집하는 데 사용되는 채널의 이름입니다.

offset_token_upper_bound

오류 행을 포함하는 상한 오프셋 토큰입니다. 이 행은 페이로드의 이 오프셋 토큰 위치 또는 그 이전 위치에 나타납니다.

error_data_truncated

원시 페이로드가 오류 테이블에 맞게 잘렸는지 여부를 나타냅니다(최대 128MB).

error_data_content_type

error_data 열에 저장된 콘텐츠 유형을 나타냅니다. 오류 데이터 콘텐츠 유형 섹션을 참조하십시오.

오류 데이터 형식

Snowpipe Streaming 오류의 경우 error_data:$1 필드에는 오류 행을 나타내는 원시 페이로드가 포함됩니다.

페이로드에 유효하지 않은 UTF-8 문자가 포함된 경우, 원시 페이로드는 base64로 인코딩된 이진 문자열로 저장됩니다.

오류 데이터 콘텐츠 유형

error_data_content_type 필드는 발생한 오류의 유형을 나타내고 해결 단계를 제안합니다.

json

오류 행은 구문적으로 유효한 JSON 문자열이지만, 데이터를 대상 테이블로 수집하는 동안 논리적 오류가 발생했습니다.

일반적인 논리적 오류는 다음과 같습니다.

  • Null을 허용하지 않는 열 누락: 페이로드에 NOT NULL 제약 조건이 있는 필수 열이 포함되지 않았습니다.

  • 형식 변환 오류: JSON 데이터 타입은 대상 열 타입으로 캐스트할 수 없습니다. 예를 들어, 문자열 값 "abc" 는 NUMBER 열로 변환할 수 없습니다.

  • 변환 오류: 0으로 나누기와 같은 파이프 변환 식을 평가하는 동안 오류가 발생했습니다.

이 문제를 해결하려면 error_metadata:error_message 의 오류 메시지 및 수집 오류가 발생한 error_metadata:error_source 의 열 이름을 검사합니다. PARSE_JSON(error_data:$1) 으로 페이로드를 구문 분석하고 데이터를 수정한 후 대상 테이블에 다시 삽입합니다.

json-invalid

구문적으로 유효하지 않은 JSON 오브젝트가 수집되었습니다.

이 문제를 해결하려면 구문 오류에 대한 세부 정보가 포함된 error_metadata:error_message 의 오류 메시지를 검사합니다. error_data:$1 에 저장된 페이로드 수정하고 대상 테이블에 다시 삽입합니다.

binary-base64

유효하지 않은 UTF-8 데이터가 수집되었습니다. 오류 페이로드는 오류 테이블에 base64로 인코딩된 이진 문자열로 저장됩니다.

이 오류 유형은 일반적으로 업스트림 데이터 소스의 형식 불일치 또는 인코딩 오류를 나타냅니다.

이 문제를 해결하려면 데이터 소스와 데이터 소스가 생성하는 데이터 형식 및 인코딩을 검사합니다. error_data:$1 에 저장된 페이로드를 BASE64_DECODE_STRING 함수로 디코딩하여 원시 바이트를 검사하고 잘못된 UTF-8 시퀀스를 식별합니다.

오류 복구 워크플로

다음 예제에서는 오류를 쿼리하고 분석하고 수정된 데이터를 다시 삽입하는 방법을 보여줍니다.

최근 오류 쿼리

SELECT
    timestamp,
    error_code,
    error_metadata:error_message::STRING AS error_message,
    error_metadata:details:channel_name::STRING AS channel,
    error_metadata:details:pipe_name::STRING AS pipe,
    error_metadata:details:error_data_content_type::STRING AS content_type,
    error_data:"$1"::STRING AS raw_payload
FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming'
  AND timestamp >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
ORDER BY timestamp DESC;

오류 분포 분석

SELECT
    error_code,
    error_metadata:error_message::STRING AS error_message,
    COUNT(*) AS error_count
FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming'
  AND timestamp >= DATEADD(hour, -24, CURRENT_TIMESTAMP())
GROUP BY 1, 2
ORDER BY error_count DESC;

복구 가능한 오류 수정 및 다시 삽입

유효한 JSON 페이로드에 오류가 있는 경우 데이터를 구문 분석하고 수정하고 다시 삽입할 수 있습니다.

INSERT INTO my_streaming_table (col1, col2, col3)
SELECT
    TRY_CAST(PARSE_JSON(error_data:"$1"):col1 AS NUMBER),
    PARSE_JSON(error_data:"$1"):col2::STRING,
    TRY_CAST(PARSE_JSON(error_data:"$1"):col3 AS TIMESTAMP)
FROM ERROR_TABLE(my_streaming_table)
WHERE error_metadata:service = 'snowpipe_streaming'
  AND error_metadata:details:error_data_content_type = 'json'
  AND timestamp >= DATEADD(hour, -24, CURRENT_TIMESTAMP());

오류를 성공적으로 재처리한 후 오류 테이블을 자를 수 있습니다.

TRUNCATE ERROR_TABLE(my_streaming_table);

청구

Snowpipe Streaming 수집 비용은 표준 Snowpipe Streaming 요율에 따라 청구됩니다. 오류 로깅을 켜도 수집 비용은 변경되지 않습니다. 실패한 행을 오류 테이블로 라우팅할 때 추가적인 수집 비용은 없습니다.

Snowflake는 오류 테이블에 저장된 데이터에 대해 다른 테이블과 동일한 표준 저장소 요율로 비용을 청구합니다. 오류 테이블은 실패한 행마다 원시 페이로드 및 오류 메타데이터를 저장합니다.

Snowpipe Streaming 비용에 대한 자세한 내용은 Snowpipe Streaming high-performance architecture: Understand your costs 섹션을 참조하세요.

제한 사항

  • 오류 테이블은 서버 측 데이터 처리(구문 분석 및 변환) 중에 발생하는 오류를 캡처합니다. 다른 스테이지의 오류(SDK 유효성 검사, API 오류 및 기타 서버 측 비동기 오류)는 오류 테이블에 캡처되지 않습니다. getChannelStatus() 를 사용하여 서버 측 비동기 오류를 모니터링합니다.

  • 수신 행의 실패율이 높으면 오류 정보를 저장하는 데 따른 오버헤드로 인해 처리 대기 시간이 증가할 수 있습니다.

  • 128MB보다 큰 페이로드는 잘립니다. error_data_truncated 필드는 잘림이 발생한 시점을 나타냅니다.

  • 오류 테이블은 Snowpipe Streaming 고성능 아키텍처에만 사용할 수 있습니다. 클래식 아키텍처의 경우 오류 처리는 SDK를 통해 클라이언트 측에서 관리됩니다.