Kafka Connector 문제 해결하기

이 섹션에서는 Kafka 커넥터를 사용하여 데이터를 수집하는 동안 발생하는 문제를 해결하는 방법에 대해 설명합니다.

이 항목의 내용:

오류 알림

Snowpipe에 대한 오류 알림을 구성합니다. Snowpipe에서 로딩 중에 파일 오류가 발생하면 이 기능은 구성된 클라우드 메시징 서비스로 알림을 푸시하여 데이터 파일을 분석할 수 있게 해줍니다. 자세한 내용은 Snowpipe 오류 알림 섹션을 참조하십시오.

일반 문제 해결 단계

Kafka 커넥터를 사용하여 로드 관련 문제를 해결하려면 다음 단계를 완료하십시오.

1단계: 테이블의 COPY 내역 보기

대상 테이블에 대한 로드 활동 내역을 쿼리합니다. 자세한 내용은 COPY_HISTORY 뷰 섹션을 참조하십시오. COPY_HISTORY 출력에 예상되는 파일 세트가 포함되어 있지 않은 경우 이전 기간을 쿼리합니다. 파일이 이전 파일과 중복되는 경우 원본 파일 로드 시 로드 내역에 활동이 기록된 것일 수 있습니다. STATUS 열은 특정 파일 세트의 로드 완료, 부분 로드 또는 로드 실패 여부를 보여줍니다. FIRST_ERROR_MESSAGE 열은 부분 로드 또는 로드 실패 시의 이유를 보여줍니다.

로드할 수 없는 파일의 경우 Kafka 커넥터가 대상 테이블과 연결된 스테이지로 이동시킵니다. 테이블 스테이지를 참조하는 구문은 @[namespace.]%table_name 입니다.

LIST 를 사용하여 테이블 스테이지에 있는 모든 파일을 나열합니다.

예:

LIST @mydb.public.%mytable;
Copy

파일 이름의 형식은 다음 중 하나와 같습니다. 각 형식을 생성하는 조건에 대한 설명은 테이블에서 제공됩니다.

파일 타입

설명

원시 바이트

이러한 파일은 다음 패턴과 일치합니다.

<커넥터_이름>/<테이블_이름>/<파티션>/offset_(<키>/<값>_)<타임스탬프>.gz

이러한 파일의 경우 Kafka 레코드는 원시 바이트에서 원본 파일 형식(Avro, JSON 또는 Protobuf)으로 변환할 수 없습니다.

이러한 문제는 일반적으로 레코드에서 문자가 삭제되는 네트워크 오류로 인해 발생합니다. Kafka 커넥터가 더 이상 원시 바이트의 구문을 분석할 수 없어 레코드가 손상되었습니다.

소스 파일 형식(Avro, JSON 또는 Protobuf)

이러한 파일은 다음 패턴과 일치합니다.

<커넥터_이름>/<테이블_이름>/<파티션>/<시작_오프셋>_<종료_오프셋>_<타임스탬프>.<파일_타입>.gz

이러한 파일의 경우 Kafka 커넥터가 원시 바이트를 원본 파일 형식으로 다시 변환한 후 Snowpipe에서 오류가 발생하여 파일을 로드할 수 없습니다.

다음 섹션에서는 각 파일 타입의 문제를 해결하기 위한 지침을 제공합니다.

원시 바이트

파일 이름 <커넥터_이름>/<테이블_이름>/<파티션>/offset_(<키>/<값>_)<타임스탬프>.gz 에는 원시 바이트에서 소스 파일 형식으로 변환되지 않은 레코드의 정확한 오프셋이 포함됩니다. 문제를 해결하려면 Kafka 커넥터에 레코드를 새 레코드로 다시 전송합니다.

소스 파일 형식(Avro, JSON 또는 Protobuf)

Snowpipe가 Kafka 항목에 대해 생성된 내부 스테이지의 파일에서 데이터를 로드할 수 없는 경우 Kafka 커넥터는 소스 파일 형식의 대상 테이블에 대한 스테이지로 파일을 이동합니다.

파일 세트에서 문제가 여러 개 발생한 경우 COPY_HISTORY 출력의 FIRST_ERROR_MESSAGE 열은 발생한 첫 번째 오류만 나타냅니다. 파일의 모든 오류를 확인하려면 테이블 스테이지에서 파일을 검색하여 명명된 스테이지로 업로드한 후 VALIDATION_MODE 문을 실행하여 COPY INTO <테이블> 복사 옵션을 RETURN_ALL_ERRORS 로 설정합니다. VALIDATION_MODE 복사 옵션은 COPY 문에 로드할 데이터의 유효성을 검사하고 지정된 유효성 검사 옵션에 따라 결과를 반환하도록 지시합니다. 이 복사 옵션이 지정되면 데이터가 로드되지 않습니다. 이 문에서 Kafka 커넥터를 사용하여 로드하려고 시도한 파일 세트를 참조합니다.

데이터 파일의 문제가 해결되면 1개 이상의 COPY 문을 사용하여 데이터를 수동으로 로드할 수 있습니다.

다음 예에서는 mydb.public 데이터베이스 및 스키마의 mytable 테이블용 테이블 스테이지에 있는 데이터 파일을 참조합니다.

테이블 스테이지에서 데이터 파일의 유효성을 검사하고 오류를 해결하려면:

  1. LIST 를 사용하여 테이블 스테이지에 있는 모든 파일을 나열합니다.

    예:

    LIST @mydb.public.%mytable;
    
    Copy

    이 섹션의 예시에서는 데이터 파일에 대한 소스 형식을 JSON으로 가정합니다.

  2. GET 을 사용하여 Kafka 커넥터에서 생성한 파일을 로컬 컴퓨터에 다운로드합니다.

    예를 들어, 파일을 로컬 컴퓨터의 data 디렉터리에 다운로드합니다.

    Linux 또는 macOS
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. 소스 Kafka 파일과 동일한 형식으로 데이터 파일을 저장하는 CREATE STAGE 를 사용하여 명명된 내부 스테이지를 생성합니다.

    예를 들어, JSON 파일을 저장하는 이름이 kafka_json 인 내부 스테이지를 생성합니다.

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. PUT 을 사용하여 테이블 스테이지에서 다운로드한 파일을 업로드 합니다.

    예를 들어 다운로드한 파일을 로컬 컴퓨터의 data 디렉터리에 업로드합니다.

    Linux 또는 macOS
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. 테스트를 수행하기 위한 목적으로 베리언트 열 2개 포함된 임시 테이블을 생성합니다. 이 테이블은 스테이징된 데이터 파일의 유효성을 확인하기 위한 목적으로만 사용됩니다. 데이터가 테이블에 로드되지 않습니다. 현재 사용자 세션이 종료되면 테이블이 자동으로 삭제됩니다.

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. COPY INTO *테이블* … VALIDATION_MODE = 〈RETURN_ALL_ERRORS〉 문을 실행하여 데이터 파일에서 발생한 모든 오류를 검색합니다. 이 문은 지정된 스테이지에서 파일의 유효성을 검사합니다. 데이터가 테이블에 로드되지 않습니다.

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. 로컬 컴퓨터의 데이터 파일에서 보고된 모든 오류를 수정합니다.

  8. PUT 을 사용하여 수정된 파일을 테이블 스테이지 또는 명명된 내부 스테이지에 업로드합니다.

    다음 예시에서는 파일을 테이블 스테이지로 업로드하며, 다음과 같이 기존 파일을 덮어씁니다.

    Linux 또는 macOS
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. VALIDATION_MODE 옵션을 제외하고 COPY INTO 테이블 을 사용하여 대상 테이블에 데이터를 로드합니다.

    선택적으로 PURGE = TRUE 복사 옵션을 사용하여 데이터가 로드되면 스테이지에서 데이터 파일을 삭제하거나 REMOVE 를 사용하여 테이블 스테이지에서 수동으로 파일을 삭제할 수 있습니다.

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

2단계: Kafka Connector 로그 파일 분석

COPY_HISTORY 뷰에 데이터 로드 기록이 없으면 Kafka 커넥터에 대한 로그 파일을 분석합니다. 커넥터는 이벤트를 로그 파일에 기록합니다. Snowflake Kafka 커넥터는 모든 Kafka 커넥터 플러그인과 동일한 로그 파일을 공유한다는 점에 유의하십시오. 이 로그 파일의 이름과 위치는 Kafka Connect 구성 파일에 있어야 합니다. 자세한 내용은 Apache Kafka 소프트웨어용으로 제공되는 설명서를 참조하십시오.

Kafka 커넥터 로그 파일에서 Snowflake 관련 오류 메시지를 검색합니다. 대부분의 메시지에는 ERROR 문자열이 있으며 파일 이름 com.snowflake.kafka.connector... 가 포함되어 이러한 메시지를 더 쉽게 찾을 수 있습니다.

발생할 수 있는 오류는 다음과 같습니다.

구성 오류

오류의 가능한 원인:

  • 항목을 구독하기 위해 필요한 적절한 정보가 커넥터에 없습니다.

  • Snowflake 테이블에 쓸 적절한 정보가 커넥터에 없습니다(예: 인증을 위한 키 페어가 잘못되었을 수 있음).

Kafka 커넥터는 매개 변수의 유효성을 검사한다는 점에 유의하십시오. 커넥터는 호환되지 않는 각 구성 매개 변수에 대한 오류를 throw합니다. 오류 메시지는 Kafka Connect 클러스터의 로그 파일에 기록됩니다. 구성 문제가 의심되는 경우에는 해당 로그 파일의 오류를 확인하십시오.

읽기 오류

커넥터가 Kafka에서 읽지 못했을 수 있는 오류는 다음과 같습니다.

  • Kafka 또는 Kafka Connect가 실행 중이 아닐 수 있습니다.

  • 메시지가 아직 전송되지 않았을 수 있습니다.

  • 메시지가 삭제(만료)되었을 수 있습니다.

쓰기 오류(스테이지)

오류의 가능한 원인:

  • 스테이지에 대한 권한이 부족합니다.

  • 스테이지 공간이 부족합니다.

  • 스테이지가 삭제되었습니다.

  • 일부 다른 사용자 또는 프로세스가 스테이지에 예기치 않은 파일을 생성했습니다.

쓰기 오류(테이블)

오류의 가능한 원인:

  • 테이블에 대한 권한이 부족합니다.

3단계: Kafka Connect 확인

Kafka 연결 로그 파일에 오류가 보고되지 않은 경우에는 Kafka Connect를 확인해야 합니다. 문제 해결 지침은 Apache Kafka 소프트웨어 공급자가 제공하는 설명서를 참조하십시오.

특정 문제 해결하기

항목 파티션 및 오프셋이 동일한 중복 행

Kafka 커넥터 버전 1.4 이상을 사용하여 데이터를 로드할 때 항목 파티션 및 오프셋이 동일한 대상 테이블의 중복 행이 있으면 이는 로드 작업에서 기본 실행 제한 시간인 300000밀리초(300초)가 초과되었음을 나타낼 수 있습니다. 원인을 확인하려면 Kafka Connect 로그 파일에서 다음 오류를 확인하십시오.

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

오류를 해결하려면 Kafka 구성 파일(예: <kafka_dir>/config/connect-distributed.properties)에서 다음 속성 중 1개 를 변경하십시오.

consumer.max.poll.interval.ms

실행 제한 시간을 900000 (900초)로 늘립니다.

consumer.max.poll.records

각 작업에서 로드되는 레코드 수를 50 개로 줄입니다.

문제 보고하기

Snowflake 지원 에 문의할 때 준비해야 하는 파일은 다음과 같습니다.

  • Kafka 커넥터에 대한 구성 파일.

    중요

    개인 키를 제거한 후 파일을 Snowflake에 제공해야 합니다.

  • Kafka Connector 로그의 복사본. 파일에 기밀 정보나 민감한 정보가 포함되어 있지 않은지 확인해야 합니다.

  • JDBC 로그 파일.

    로그 파일을 생성하려면 Kafka Connect 클러스터에서 JDBC_TRACE = true 환경 변수를 설정한 후 Kafka 커넥터를 실행하십시오.

    JDBC 로그 파일에 대한 자세한 내용은 Snowflake Community의 이 문서 를 참조하십시오.

  • Connect 로그 파일.

    로그 파일을 생성하려면 etc/kafka/connect-log4j.properties 파일을 편집합니다. log4j.appender.stdout.layout.ConversionPattern 속성을 다음과 같이 설정합니다.

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    커넥터 컨텍스트는 Kafka 버전 2.3 이상에서 사용할 수 있습니다.

    자세한 내용은 Confluent 웹 사이트에서 로깅 개선 사항 정보를 참조하십시오.