Solução de problemas do conector Kafka

Esta seção descreve como solucionar problemas encontrados durante a ingestão de dados usando o conector Kafka.

Neste tópico:

Notificações de erro

Configure notificações de erro do Snowpipe. Quando o Snowpipe encontra erros de arquivo durante uma carga, o recurso envia uma notificação para um serviço de mensagens em nuvem configurado, permitindo a análise de seus arquivos de dados. Para obter mais informações, consulte Notificações de erro do Snowpipe.

Etapas gerais de solução de problemas

Complete as etapas a seguir para solucionar problemas de carregamento usando o conector Kafka.

Etapa 1: ver o histórico de COPY da tabela

Consulte o histórico de atividade de carregamento da tabela de destino. Para obter mais informações, consulte Exibição COPY_HISTORY. Se a saída COPY_HISTORY não incluir um conjunto de arquivos esperados, consulte um período anterior. Se os arquivos fossem duplicatas de arquivos anteriores, o histórico de carregamento pode ter registrado a atividade quando foi feita a tentativa de carregar os arquivos originais. A coluna STATUS indica se um determinado conjunto de arquivos foi carregado, parcialmente carregado, ou não foi carregado. A coluna FIRST_ERROR_MESSAGE apresenta uma razão quando uma tentativa é parcialmente carregada ou quando não há carregamento.

O conector Kafka move os arquivos que não conseguiu carregar para o estágio associado à tabela de destino. A sintaxe para referenciar o estágio da tabela é @[namespace.]%table_name.

Liste todos os arquivos localizados no estágio da tabela usando LIST.

Por exemplo:

LIST @mydb.public.%mytable;
Copy

Os nomes dos arquivos estão em um dos seguintes formatos. As condições que produzem cada formato estão descritas na tabela:

Tipo de arquivo

Descrição

Bytes brutos

Estes arquivos correspondem ao seguinte padrão:

<nome_conector>/<nome_tabela>/<partição>/offset_(<chave>/<valor>_)<carimbo_data_hora>.gz

Para esses arquivos, não foi possível converter os registros do Kafka de bytes brutos para o formato do arquivo de origem (Avro, JSON ou Protobuf).

Uma causa comum para esse problema é uma falha de rede que resulta no descarte de um caractere do registro. O conector Kafka não conseguiu concluir a análise dos bytes brutos, resultando em um registro corrompido.

Formato do arquivo de origem (Avro, JSON ou Protobuf)

Estes arquivos correspondem ao seguinte padrão:

<nome_conector>/<nome_tabela>/<partition>/<início_offset>_<término_offset>_<carimbo_data_hora>.<tipo_arquivo>.gz

Para esses arquivos, depois que o conector Kafka converteu os bytes brutos de volta ao formato do arquivo de origem, o Snowpipe encontrou um erro e não foi possível carregar o arquivo.

As seções seguintes fornecem instruções para a resolução de problemas relativos a cada um dos tipos de arquivo:

Bytes brutos

O nome de arquivo <nome_conector>/<nome_tabela>/<partição>/offset_(<chave>/<valor>_)<carimbo_data_hora>.gz inclui o offset exato do registro que não foi convertido de bytes brutos para o formato do arquivo de origem. Para resolver os problemas, reenvie o registro para o conector Kafka como um novo registro.

Formato do arquivo de origem (Avro, JSON ou Protobuf)

Se o Snowpipe não tiver conseguido carregar os dados dos arquivos no estágio interno criado para o tópico Kafka, o conector Kafka move os arquivos para o estágio da tabela de destino no formato do arquivo de origem.

Se um conjunto de arquivos tiver vários problemas, a coluna FIRST_ERROR_MESSAGE na saída COPY_HISTORY indica apenas o primeiro erro encontrado. Para visualizar todos os erros nos arquivos, é necessário recuperar os arquivos do estágio de tabela, carregá-los para um estágio nomeado, e depois executar uma instrução COPY INTO <tabela> com a opção de cópia VALIDATION_MODE definida como RETURN_ALL_ERRORS. A opção de cópia VALIDATION_MODE instrui uma instrução COPY para validar os dados a serem carregados e retornar resultados com base na opção de validação especificada. Nenhum dado é carregado quando essa opção de cópia é especificada. Na instrução, referencie o conjunto de arquivos que você tentou carregar usando o conector Kafka.

Quando o problema com os arquivos de dados for resolvido, você pode carregar os dados manualmente usando uma ou mais instruções COPY.

Os exemplos a seguir referenciam arquivos de dados localizados no estágio de tabela da tabela mytable no banco de dados e esquema mydb.public.

Para validar arquivos de dados no estágio de tabela e resolver erros:

  1. Liste todos os arquivos localizados no estágio da tabela usando LIST.

    Por exemplo:

    LIST @mydb.public.%mytable;
    
    Copy

    Os exemplos nesta seção presumem que JSON é o formato dos arquivos de dados de origem.

  2. Faça o download dos arquivos criados pelo conector Kafka na sua máquina local usando GET.

    Por exemplo, baixe os arquivos para um diretório chamado data na sua máquina local:

    Linux ou macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. Usando CREATE STAGE, crie um estágio interno nomeado que armazene arquivos de dados com o mesmo formato do arquivos Kafka de origem.

    Por exemplo, crie um estágio interno chamado kafka_json que armazene arquivos JSON:

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. Carregue os arquivos que você baixou do estágio de tabela usando PUT.

    Por exemplo, carregue os arquivos baixados para o diretório data em sua máquina local:

    Linux ou macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. Crie uma tabela temporária com duas colunas de variantes para fins de teste. A tabela é usada apenas para validar o arquivo de dados preparado. Nenhum dado é carregado na tabela. A tabela é descartada automaticamente quando a sessão atual do usuário termina:

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. Recupere todos os erros encontrados no arquivo de dados executando uma instrução COPY INTO *tabela* … VALIDATION_MODE = “RETURN_ALL_ERRORS”. A instrução valida o arquivo no estágio especificado. Nenhum dado é carregado na tabela:

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. Corrija todos os erros relatados nos arquivos de dados na sua máquina local.

  8. Carregue os arquivos fixos para o estágio de tabela ou para o estágio interno nomeado usando PUT.

    O seguinte exemplo carrega os arquivos para o estágio de tabela, substituindo os arquivos existentes:

    Linux ou macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. Carregue os dados na tabela de destino usando COPY INTO tabela sem a opção VALIDATION_MODE.

    Como alternativa, você pode usar a opção de cópia PURGE = TRUE para excluir os arquivos de dados do estágio quando os dados tiverem sido carregados com sucesso, ou apagar manualmente os arquivos do estágio de tabela usando REMOVE:

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

Etapa 2: analisar o arquivo de log do conector Kafka

Se a visualização COPY_HISTORY não tiver registro do carregamento de dados, analise o arquivo de log do conector Kafka. O conector grava os eventos no arquivo de log. Observe que o conector Kafka do Snowflake compartilha o mesmo arquivo de log com todos os plugins do conector Kafka. O nome e a localização desse arquivo de log deve estar no arquivo de configuração do Kafka Connect. Para obter mais informações, consulte a documentação do software Apache Kafka.

Pesquise mensagens de erro relacionadas ao Snowflake no arquivo de log do conector Kafka. A maioria das mensagens terá a cadeia de caracteres ERROR e conterá o nome de arquivo com.snowflake.kafka.connector... para facilitar a localização.

Possíveis erros que você pode encontrar:

Erro de configuração:

Possíveis causas do erro:

  • O conector não tem as informações adequadas para se inscrever no tópico.

  • O conector não tem as informações adequadas para gravar na tabela do Snowflake (por exemplo, o par de chaves de autenticação pode estar errado).

Observe que o conector Kafka valida seus parâmetros. O conector gera um erro para cada parâmetro de configuração incompatível. A mensagem de erro é gravada no arquivo de log do cluster do Kafka Connect. Se você suspeitar de um problema de configuração, verifique os erros nesse arquivo de log.

Erro de leitura:

O conector pode não ter conseguido ler a partir do Kafka pelas seguintes razões:

  • O Kafka ou Kafka Connect pode não estar em execução.

  • A mensagem pode ainda não ter sido enviada.

  • A mensagem pode ter sido excluída (expirou).

Erro de gravação (estágio):

Possíveis causas do erro:

  • Privilégios insuficientes no estágio.

  • O estágio está sem espaço.

  • O estágio foi descartado.

  • Algum outro usuário ou processo gravou arquivos inesperados no estágio.

Erro de gravação (tabela):

Possíveis causas do erro:

  • Privilégios insuficientes na tabela.

Etapa 3: Verificar o Kafka Connect

Se nenhum erro tiver sido relatado no arquivo de log do Kafka Connect, verifique o Kafka Connect. Para obter instruções de solução de problemas, consulte a documentação do fornecedor do software Apache Kafka.

Resolução de problemas específicos

Linhas duplicadas com o mesmo offset e a mesma partição de tópico

Ao carregar dados usando a versão 1.4 do conector Kafka (ou superior), as linhas duplicadas na tabela de destino com a mesma partição de tópico e offset podem indicar que a operação de carregamento excedeu o tempo de execução padrão de 300.000 milissegundos (300 segundos). Para confirmar a causa, pesquise o seguinte erro no arquivo de log do Kafka Connect:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

Para resolver o erro, no arquivo de configuração do Kafka (por exemplo, <dir_kafka>/config/connect-distributed.properties), altere uma das seguintes propriedades:

consumer.max.poll.interval.ms

Aumente o tempo de execução para 900000 (900 segundos).

consumer.max.poll.records

Diminua o número de registros carregados em cada operação para 50.

Como relatar um problema

Ao entrar em contato com o suporte Snowflake para pedir ajuda, tenha os seguintes arquivos disponíveis:

  • Arquivo de configuração do conector Kafka.

    Importante

    Remova a chave privada antes de fornecer o arquivo ao Snowflake.

  • Cópia do log do conector Kafka. Certifique-se de que o arquivo não contenha informações confidenciais ou sensíveis.

  • Arquivo de log do JDBC.

    Para gerar o arquivo de log, defina a variável de ambiente JDBC_TRACE = true no cluster do Kafka Connect antes de executar o conector Kafka.

    Para obter mais informações sobre o arquivo de log do JDBC, consulte este artigo na Comunidade Snowflake.

  • Arquivo de log do Connect.

    Para gerar o arquivo de log, edite o arquivo etc/kafka/connect-log4j.properties. Defina a propriedade log4j.appender.stdout.layout.ConversionPattern da seguinte forma:

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    Os contextos dos conectores estão disponíveis na versão Kafka 2.3 ou superior.

    Para obter mais informações, consulte Logging Improvements no site do Confluent.