Kafka 커넥터 문제 해결하기¶
이 섹션에서는 Kafka 커넥터를 사용하여 데이터를 수집하는 동안 발생하는 문제를 해결하는 방법에 대해 설명합니다.
이 항목의 내용:
오류 알림¶
Snowpipe에 대한 오류 알림을 구성합니다. Snowpipe에서 로딩 중에 파일 오류가 발생하면 이 기능은 구성된 클라우드 메시징 서비스로 알림을 푸시하여 데이터 파일을 분석할 수 있게 해줍니다. 자세한 내용은 Snowpipe 오류 알림 섹션을 참조하십시오.
일반 문제 해결 단계¶
Kafka 커넥터를 사용하여 로드 관련 문제를 해결하려면 다음 단계를 완료하십시오.
1단계: 테이블의 COPY 기록 보기¶
대상 테이블에 대한 로드 활동 내역을 쿼리합니다. 자세한 내용은 COPY_HISTORY 뷰 섹션을 참조하십시오. COPY_HISTORY 출력에 예상되는 파일 세트가 포함되어 있지 않은 경우 이전 기간을 쿼리합니다. 파일이 이전 파일과 중복되는 경우 원본 파일 로드 시 로드 내역에 활동이 기록된 것일 수 있습니다. STATUS
열은 특정 파일 세트의 로드 완료, 부분 로드 또는 로드 실패 여부를 보여줍니다. FIRST_ERROR_MESSAGE
열은 부분 로드 또는 로드 실패 시의 이유를 보여줍니다.
로드할 수 없는 파일의 경우 Kafka 커넥터가 대상 테이블과 연결된 스테이지로 이동시킵니다. 테이블 스테이지를 참조하는 구문은 @[namespace.]%table_name
입니다.
LIST 를 사용하여 테이블 스테이지에 있는 모든 파일을 나열합니다.
예:
LIST @mydb.public.%mytable;
파일 이름의 형식은 다음 중 하나와 같습니다. 각 형식을 생성하는 조건에 대한 설명은 테이블에서 제공됩니다.
파일 타입 |
설명 |
---|---|
원시 바이트 |
이러한 파일은 다음 패턴과 일치합니다.
이러한 파일의 경우 Kafka 레코드는 원시 바이트에서 원본 파일 형식(Avro, JSON 또는 Protobuf)으로 변환할 수 없습니다. 이러한 문제는 일반적으로 레코드에서 문자가 삭제되는 네트워크 오류로 인해 발생합니다. Kafka 커넥터가 더 이상 원시 바이트의 구문을 분석할 수 없어 레코드가 손상되었습니다. |
소스 파일 형식(Avro, JSON 또는 Protobuf) |
이러한 파일은 다음 패턴과 일치합니다.
이러한 파일의 경우 Kafka 커넥터가 원시 바이트를 원본 파일 형식으로 다시 변환한 후 Snowpipe에서 오류가 발생하여 파일을 로드할 수 없습니다. |
다음 섹션에서는 각 파일 타입의 문제를 해결하기 위한 지침을 제공합니다.
원시 바이트¶
파일 이름 <커넥터_이름>/<테이블_이름>/<파티션>/offset_(<키>/<값>_)<타임스탬프>.gz
에는 원시 바이트에서 소스 파일 형식으로 변환되지 않은 레코드의 정확한 오프셋이 포함됩니다. 문제를 해결하려면 Kafka 커넥터에 레코드를 새 레코드로 다시 전송합니다.
원본 파일 형식(Avro, JSON 또는 프로토콜 버퍼)¶
Snowpipe가 Kafka 항목에 대해 생성된 내부 스테이지의 파일에서 데이터를 로드할 수 없는 경우 Kafka 커넥터는 소스 파일 형식의 대상 테이블에 대한 스테이지로 파일을 이동합니다.
파일 세트에서 문제가 여러 개 발생한 경우 COPY_HISTORY 출력의 FIRST_ERROR_MESSAGE
열은 발생한 첫 번째 오류만 나타냅니다. 파일의 모든 오류를 확인하려면 테이블 스테이지에서 파일을 검색하여 명명된 스테이지로 업로드한 후 VALIDATION_MODE 문을 실행하여 COPY INTO <테이블> 복사 옵션을 RETURN_ALL_ERRORS
로 설정합니다. VALIDATION_MODE 복사 옵션은 COPY 문에 로드할 데이터의 유효성을 검사하고 지정된 유효성 검사 옵션에 따라 결과를 반환하도록 지시합니다. 이 복사 옵션이 지정되면 데이터가 로드되지 않습니다. 이 문에서 Kafka 커넥터를 사용하여 로드하려고 시도한 파일 세트를 참조합니다.
데이터 파일의 문제가 해결되면 1개 이상의 COPY 문을 사용하여 데이터를 수동으로 로드할 수 있습니다.
다음 예에서는 mydb.public
데이터베이스 및 스키마의 mytable
테이블용 테이블 스테이지에 있는 데이터 파일을 참조합니다.
테이블 스테이지에서 데이터 파일의 유효성을 검사하고 오류를 해결하려면:
LIST 를 사용하여 테이블 스테이지에 있는 모든 파일을 나열합니다.
예:
LIST @mydb.public.%mytable;
이 섹션의 예시에서는 데이터 파일에 대한 소스 형식을 JSON으로 가정합니다.
GET 을 사용하여 Kafka 커넥터에서 생성한 파일을 로컬 컴퓨터에 다운로드합니다.
예를 들어, 파일을 로컬 컴퓨터의
data
디렉터리에 다운로드합니다.- Linux 또는 macOS:
GET @mydb.public.%mytable file:///data/;
- Microsoft Windows:
GET @mydb.public.%mytable file://C:\data\;
소스 Kafka 파일과 동일한 형식으로 데이터 파일을 저장하는 CREATE STAGE 를 사용하여 명명된 내부 스테이지를 생성합니다.
예를 들어, JSON 파일을 저장하는 이름이
kafka_json
인 내부 스테이지를 생성합니다.CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
PUT 을 사용하여 테이블 스테이지에서 다운로드한 파일을 업로드 합니다.
예를 들어 다운로드한 파일을 로컬 컴퓨터의
data
디렉터리에 업로드합니다.- Linux 또는 macOS:
PUT file:///data/ @mydb.public.kafka_json;
- Microsoft Windows:
PUT file://C:\data\ @mydb.public.kafka_json;
테스트를 수행하기 위한 목적으로 베리언트 열 2개 포함된 임시 테이블을 생성합니다. 이 테이블은 스테이징된 데이터 파일의 유효성을 확인하기 위한 목적으로만 사용됩니다. 데이터가 테이블에 로드되지 않습니다. 현재 사용자 세션이 종료되면 테이블이 자동으로 삭제됩니다.
CREATE TEMPORARY TABLE t1 (col1 variant);
COPY INTO *테이블* … VALIDATION_MODE = ‘RETURN_ALL_ERRORS’ 문을 실행하여 데이터 파일에서 발생한 모든 오류를 검색합니다. 이 문은 지정된 스테이지에서 파일의 유효성을 검사합니다. 데이터가 테이블에 로드되지 않습니다.
COPY INTO mydb.public.t1 FROM @mydb.public.kafka_json FILE_FORMAT = (TYPE = JSON) VALIDATION_MODE = 'RETURN_ALL_ERRORS';
로컬 컴퓨터의 데이터 파일에서 보고된 모든 오류를 수정합니다.
PUT 을 사용하여 수정된 파일을 테이블 스테이지 또는 명명된 내부 스테이지에 업로드합니다.
다음 예시에서는 파일을 테이블 스테이지로 업로드하며, 다음과 같이 기존 파일을 덮어씁니다.
- Linux 또는 macOS:
PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
- Windows:
PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
VALIDATION_MODE 옵션을 제외하고 COPY INTO 테이블 을 사용하여 대상 테이블에 데이터를 로드합니다.
선택적으로 PURGE = TRUE 복사 옵션을 사용하여 데이터가 로드되면 스테이지에서 데이터 파일을 삭제하거나 REMOVE 를 사용하여 테이블 스테이지에서 수동으로 파일을 삭제할 수 있습니다.
COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT) FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable) FILE_FORMAT = (TYPE = 'JSON') PURGE = TRUE;
2단계: Kafka 커넥터 로그 파일 분석¶
COPY_HISTORY 뷰에 데이터 로드 기록이 없으면 Kafka 커넥터에 대한 로그 파일을 분석합니다. 커넥터는 이벤트를 로그 파일에 기록합니다. Snowflake Kafka 커넥터는 모든 Kafka 커넥터 플러그인과 동일한 로그 파일을 공유한다는 점에 유의하십시오. 이 로그 파일의 이름과 위치는 Kafka Connect 구성 파일에 있어야 합니다. 자세한 내용은 Apache Kafka 소프트웨어용으로 제공되는 설명서를 참조하십시오.
Kafka 커넥터 로그 파일에서 Snowflake 관련 오류 메시지를 검색합니다. 대부분의 메시지에는 ERROR
문자열이 있으며 파일 이름 com.snowflake.kafka.connector...
가 포함되어 이러한 메시지를 더 쉽게 찾을 수 있습니다.
발생할 수 있는 오류는 다음과 같습니다.
- 구성 오류:
오류의 가능한 원인:
항목을 구독하기 위해 필요한 적절한 정보가 커넥터에 없습니다.
Snowflake 테이블에 쓸 적절한 정보가 커넥터에 없습니다(예: 인증을 위한 키 페어가 잘못되었을 수 있음).
Kafka 커넥터는 매개 변수의 유효성을 검사한다는 점에 유의하십시오. 커넥터는 호환되지 않는 각 구성 매개 변수에 대한 오류를 throw합니다. 오류 메시지는 Kafka Connect 클러스터의 로그 파일에 기록됩니다. 구성 문제가 의심되는 경우에는 해당 로그 파일의 오류를 확인하십시오.
- 읽기 오류:
커넥터가 Kafka에서 읽지 못했을 수 있는 오류는 다음과 같습니다.
Kafka 또는 Kafka Connect가 실행 중이 아닐 수 있습니다.
메시지가 아직 전송되지 않았을 수 있습니다.
메시지가 삭제(만료)되었을 수 있습니다.
- 쓰기 오류(스테이지):
오류의 가능한 원인:
스테이지에 대한 권한이 부족합니다.
스테이지 공간이 부족합니다.
스테이지가 삭제되었습니다.
일부 다른 사용자 또는 프로세스가 스테이지에 예기치 않은 파일을 생성했습니다.
- 쓰기 오류(테이블):
오류의 가능한 원인:
테이블에 대한 권한이 부족합니다.
3단계: Kafka Connect 확인¶
Kafka 연결 로그 파일에 오류가 보고되지 않은 경우에는 Kafka Connect를 확인해야 합니다. 문제 해결 지침은 Apache Kafka 소프트웨어 공급자가 제공하는 설명서를 참조하십시오.
특정 문제 해결하기¶
항목 파티션 및 오프셋이 동일한 중복 행¶
Kafka 커넥터 버전 1.4 이상을 사용하여 데이터를 로드할 때 항목 파티션 및 오프셋이 동일한 대상 테이블의 중복 행이 있으면 이는 로드 작업에서 기본 실행 제한 시간인 300000밀리초(300초)가 초과되었음을 나타낼 수 있습니다. 원인을 확인하려면 Kafka Connect 로그 파일에서 다음 오류를 확인하십시오.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
오류를 해결하려면 Kafka 구성 파일(예: <kafka_dir>/config/connect-distributed.properties
)에서 다음 속성 중 1개 를 변경하십시오.
consumer.max.poll.interval.ms
실행 제한 시간을
900000
(900초)로 늘립니다.consumer.max.poll.records
각 작업에서 로드되는 레코드 수를
50
개로 줄입니다.
스트리밍 채널 오프셋 마이그레이션 실패 응답 오류 코드: 5023¶
v2.1.0 이상 커넥터 버전으로 업그레이드하면서, Snowpipe 스트리밍 채널 이름 형식이 변경되었습니다. 결과적으로, 이전에 커밋된 오프셋에 대한 정보를 감지하는 논리는 이전에 커밋된 오프셋에 대한 정보를 찾을 수 없습니다. 이는 다음과 같은 예외로 표시됩니다.
com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023
Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support
Message: Snowflake experienced a transient exception, please retry the migration request.
이 오류를 해결하려면 Kafka 구성 파일(예: <kafka_dir>/config/connect-distributed.properties
)에 다음 구성 속성을 추가합니다.
enable.streaming.channel.offset.migration
자동 오프셋 마이그레이션을
false
로 설정하여 비활성화합니다.
여러 항목을 지원하도록 커넥터 구성¶
여러 개의 파티션이 있는 다수의 항목을 지원하는 단일 kafka 커넥터 인스턴스에서 문제가 발생했습니다. 커넥터의 구성은 유효해 보였지만, Snowflake로 데이터를 수집할 수 없어 끝없는 재조정 주기가 발생했습니다. 해당 이슈는 Snowpipe Streaming 수집 모드(snowflake.ingestion.method=SNOWPIPE_STREAMING
)에 국한된 것이지만, 가이드라인은 Snowpipe 수집 모드(snowflake.ingestion.method=SNOWPIPE
)에도 적용될 수 있습니다. 이 문제는 이 로그 메시지를 반복적으로 기록하여 로그 파일에 표시됩니다.
[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed
이는 일반적으로 정규식을 통해 항목을 수집하도록 커넥터를 구성할 때 발생할 수 있습니다. Snowflake는 다음 옵션 세트를 Kafka 구성 파일(예: <kafka_dir>/config/connect-distributed.properties
)에 적용하는 것을 권장합니다
consumer.override.partition.assignment.strategy
작업에 대한 파티션 할당 전략을
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
로 구성 - 그러면 수집된 채널이 사용 가능한 작업에 균등하게 분배되어 밸런싱 재조정 위험이 줄어듭니다.tasks.max
커넥터당 인스턴스화된 작업의 수는 사용 가능한 CPU의 수를 초과해서는 안 되며, 기본 드라이버는 사용 가능한 CPU를 기반으로 스로틀링 메커니즘을 구현합니다. 동시 요청 수가 증가하면 시스템의 메모리 부담이 늘어나고, 삽입 처리 시간도 길어져 커넥터 하트비트가 누락되는 문제가 발생합니다.
커넥터의 시간 제한 값에 대해 설명할 때, 이 값에 직접 영향을 미치는 구성 속성 세트가 있습니다.
consumer.override.heartbeat.interval.ms
모니터 스레드(각 작업과 연결된 스레드가 하나씩 있음)가 Kafka에 하트비트를 보내는 빈도를 정의합니다. 기본값은
3000
ms이지만 시스템 부하가 높은 경우5000
ms까지 늘려 실험해 볼 수 있습니다.consumer.override.session.timeout.ms
브로커가 컨슈머가 유효하지 않은 상태에 있다고 가정하고 재조정을 시도하기 전에 기다리는 시간을 정의합니다. 이 설정은 일반적으로 하트비트 간격보다 3배 높아야 하므로 하트비트를
5000
ms로 구성한 경우 이 설정도15000
ms로 설정합니다.consumer.override.max.poll.interval.ms
기본 Kafka에서
poll()
로 호출하는 최대 간격을 정의합니다. 폴 사이에 소요된 시간은 기본적으로 커넥터가 데이터 배치를 처리하는 데 걸리는 시간(Snowflake에 업로드하고 커밋하는 것을 포함)에 매핑됩니다. 여러 작업이 데이터를 처리하는 시나리오에서는 기본 Snowflake 연결이 요청을 제한하여 처리 시간이 더 길어질 수 있습니다. 시나리오에 따라, 특히 수집할 초기 레코드 수가 많은 상태에서 커넥터를 시작하는 경우 이 값을 20분(1200000
ms)까지 늘릴 수 있습니다.consumer.override.rebalance.timeout.ms
작업당 채널 수가 많은 시나리오에서 재밸런싱이 발생하면 처리를 재개할 위치를 파악하기 위해 채널당 많은 기본 논리가 필요합니다. 이 코드는 순차적으로 실행되므로 작업당 채널이 많을수록 초기 설정이 더 오래 지속됩니다. 이 속성을 충분히 큰 값으로 구성하여 각 채널이 초기화를 완료할 수 있도록 합니다. 3분(
180000
ms) 값이 좋은 시작점입니다.
커넥터에서 사용 가능한 힙 메모리를 알아두는 것도 중요합니다. 이는 여러 커넥터가 동시에 실행되거나 하나의 커넥터가 여러 항목의 데이터를 수집하는 시나리오에서 특히 중요합니다. 각 항목의 파티션은 단일 채널에 매핑되므로 메모리가 필요합니다.
Xmx 설정을 통해 Kafka 연결 프로세스 메모리 설정을 조정합니다. 이를 수행하는 한 가지 방법은 KAFKA_OPTS
환경 변수를 정의하고 그에 따라(즉, KAFKA_OPTS=-Xmx4G
) 설정하는 것입니다.
파일 클리너가 예기치 않게 파일을 제거함¶
SNOWPIPE와 함께 Kafka 커넥터를 사용할 때 여러 항목에서 단일 테이블로 데이터를 수집하는 문제가 발생할 수 있습니다. 구성에 snowflake.topic2table.map
항목이 없거나 항목과 테이블 간에 1:1 매핑이 있는 경우에는 이 문제가 적용되지 않습니다.
Kafka 커넥터는 스테이지에 업로드할 레코드가 포함된 파일을 생성합니다. 이러한 파일은 snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz
패턴으로 형식이 지정됩니다. <partition-id>
에 문제 있음: 여러 항목이 하나의 테이블에 데이터를 로드하는 경우 partition-id
값에 중복이 있을 수 있습니다. 이는 일반적인 커넥터 작동에서는 문제가 되지 않습니다. 그러나 커넥터가 다시 시작되거나 재조정되면 클리너 프로세스가 스테이지에 로드되었지만 아직 수집되지 않은 파일을 잘못된 파티션에 잘못 연결하여 삭제하도록 결정하여 데이터 손실 이벤트가 발생할 수 있습니다.
버전 2.4.x의 커넥터는 소스 항목의 해시코드를 partition-id
에 포함시켜 단일 항목의 파티션과 정확히 일치하는 고유한 파일 이름을 보장함으로써 이 문제를 해결합니다. 이 수정 사항은 기본적으로 활성화되며 snowflake.snowpipe.stageFileNameExtensionEnabled
snowflake.topic2table.map
에 대상 테이블이 두 번 이상 목록에 있는 구성에만 영향을 줍니다.
이 기능으로 인해 구성이 영향을 받으면 스테이지에 부실한 파일이 업로드될 수 있습니다. 커넥터가 시작되면 스테이지에 해당 파일이 있는지 확인합니다. NOTE: For table
로 시작하는 로그 항목과 탐지된 파일 목록을 찾아야 합니다.
또한 수동으로 스테이지에서 영향을 받는 파일이 있는지 확인할 수도 있습니다.
영향을 받은 스테이지를 찾습니다.
show stages like 'snowflake_kafka_connector%<your table name>';
스테이징된 파일을 나열합니다.
list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
위의 명령은 테이블의 스테이지와 일치하고 파티션 IDs가 0-9999 범위인 모든 파일을 나열합니다. 이러한 파일은 더 이상 수집되지 않으므로 다운로드하거나 삭제할 수 있습니다.
문제 보고하기¶
Snowflake 지원 에 문의할 때 준비해야 하는 파일은 다음과 같습니다.
Kafka 커넥터에 대한 구성 파일.
중요
개인 키를 제거한 후 파일을 Snowflake에 제공해야 합니다.
Kafka Connector 로그의 복사본. 파일에 기밀 정보나 민감한 정보가 포함되어 있지 않은지 확인해야 합니다.
JDBC 로그 파일.
로그 파일을 생성하려면 Kafka Connect 클러스터에서
JDBC_TRACE = true
환경 변수를 설정한 후 Kafka 커넥터를 실행하십시오.JDBC 로그 파일에 대한 자세한 내용은 Snowflake Community의 이 문서 를 참조하십시오.
Connect 로그 파일.
로그 파일을 생성하려면
etc/kafka/connect-log4j.properties
파일을 편집합니다.log4j.appender.stdout.layout.ConversionPattern
속성을 다음과 같이 설정합니다.log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
커넥터 컨텍스트는 Kafka 버전 2.3 이상에서 사용할 수 있습니다.
자세한 내용은 Confluent 웹 사이트에서 로깅 개선 사항 정보를 참조하십시오.