Solução de problemas do conector Kafka

Esta seção descreve como solucionar problemas encontrados durante a ingestão de dados usando o conector Kafka.

Neste tópico:

Notificações de erro

Configure notificações de erro do Snowpipe. Quando o Snowpipe encontra erros de arquivo durante uma carga, o recurso envia uma notificação para um serviço de mensagens em nuvem configurado, permitindo a análise de seus arquivos de dados. Para obter mais informações, consulte Notificações de erro do Snowpipe.

Etapas gerais de solução de problemas

Complete as etapas a seguir para solucionar problemas de carregamento usando o conector Kafka.

Etapa 1: ver o histórico de COPY da tabela

Consulte o histórico de atividade de carregamento da tabela de destino. Para obter mais informações, consulte Exibição COPY_HISTORY. Se a saída COPY_HISTORY não incluir um conjunto de arquivos esperados, consulte um período anterior. Se os arquivos fossem duplicatas de arquivos anteriores, o histórico de carregamento pode ter registrado a atividade quando foi feita a tentativa de carregar os arquivos originais. A coluna STATUS indica se um determinado conjunto de arquivos foi carregado, parcialmente carregado, ou não foi carregado. A coluna FIRST_ERROR_MESSAGE apresenta uma razão quando uma tentativa é parcialmente carregada ou quando não há carregamento.

O conector Kafka move os arquivos que não conseguiu carregar para o estágio associado à tabela de destino. A sintaxe para referenciar o estágio da tabela é @[namespace.]%table_name.

Liste todos os arquivos localizados no estágio da tabela usando LIST.

Por exemplo:

LIST @mydb.public.%mytable;
Copy

Os nomes dos arquivos estão em um dos seguintes formatos. As condições que produzem cada formato estão descritas na tabela:

Tipo de arquivo

Descrição

Bytes brutos

Estes arquivos correspondem ao seguinte padrão:

<nome_conector>/<nome_tabela>/<partição>/offset_(<chave>/<valor>_)<carimbo_data_hora>.gz

Para esses arquivos, não foi possível converter os registros do Kafka de bytes brutos para o formato do arquivo de origem (Avro, JSON ou Protobuf).

Uma causa comum para esse problema é uma falha de rede que resulta no descarte de um caractere do registro. O conector Kafka não conseguiu concluir a análise dos bytes brutos, resultando em um registro corrompido.

Formato do arquivo de origem (Avro, JSON ou Protobuf)

Estes arquivos correspondem ao seguinte padrão:

<nome_conector>/<nome_tabela>/<partition>/<início_offset>_<término_offset>_<carimbo_data_hora>.<tipo_arquivo>.gz

Para esses arquivos, depois que o conector Kafka converteu os bytes brutos de volta ao formato do arquivo de origem, o Snowpipe encontrou um erro e não foi possível carregar o arquivo.

As seções seguintes fornecem instruções para a resolução de problemas relativos a cada um dos tipos de arquivo:

Bytes brutos

O nome de arquivo <nome_conector>/<nome_tabela>/<partição>/offset_(<chave>/<valor>_)<carimbo_data_hora>.gz inclui o offset exato do registro que não foi convertido de bytes brutos para o formato do arquivo de origem. Para resolver os problemas, reenvie o registro para o conector Kafka como um novo registro.

Formato do arquivo de origem (Avro, JSON ou Protobuf)

Se o Snowpipe não tiver conseguido carregar os dados dos arquivos no estágio interno criado para o tópico Kafka, o conector Kafka move os arquivos para o estágio da tabela de destino no formato do arquivo de origem.

Se um conjunto de arquivos tiver vários problemas, a coluna FIRST_ERROR_MESSAGE na saída COPY_HISTORY indica apenas o primeiro erro encontrado. Para visualizar todos os erros nos arquivos, é necessário recuperar os arquivos do estágio de tabela, carregá-los para um estágio nomeado, e depois executar uma instrução COPY INTO <tabela> com a opção de cópia VALIDATION_MODE definida como RETURN_ALL_ERRORS. A opção de cópia VALIDATION_MODE instrui uma instrução COPY para validar os dados a serem carregados e retornar resultados com base na opção de validação especificada. Nenhum dado é carregado quando essa opção de cópia é especificada. Na instrução, referencie o conjunto de arquivos que você tentou carregar usando o conector Kafka.

Quando o problema com os arquivos de dados for resolvido, você pode carregar os dados manualmente usando uma ou mais instruções COPY.

Os exemplos a seguir referenciam arquivos de dados localizados no estágio de tabela da tabela mytable no banco de dados e esquema mydb.public.

Para validar arquivos de dados no estágio de tabela e resolver erros:

  1. Liste todos os arquivos localizados no estágio da tabela usando LIST.

    Por exemplo:

    LIST @mydb.public.%mytable;
    
    Copy

    Os exemplos nesta seção presumem que JSON é o formato dos arquivos de dados de origem.

  2. Faça o download dos arquivos criados pelo conector Kafka na sua máquina local usando GET.

    Por exemplo, baixe os arquivos para um diretório chamado data na sua máquina local:

    Linux ou macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. Usando CREATE STAGE, crie um estágio interno nomeado que armazene arquivos de dados com o mesmo formato do arquivos Kafka de origem.

    Por exemplo, crie um estágio interno chamado kafka_json que armazene arquivos JSON:

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. Carregue os arquivos que você baixou do estágio de tabela usando PUT.

    Por exemplo, carregue os arquivos baixados para o diretório data em sua máquina local:

    Linux ou macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. Crie uma tabela temporária com duas colunas de variantes para fins de teste. A tabela é usada apenas para validar o arquivo de dados preparado. Nenhum dado é carregado na tabela. A tabela é descartada automaticamente quando a sessão atual do usuário termina:

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. Recupere todos os erros encontrados no arquivo de dados executando uma instrução COPY INTO *tabela* … VALIDATION_MODE = “RETURN_ALL_ERRORS”. A instrução valida o arquivo no estágio especificado. Nenhum dado é carregado na tabela:

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. Corrija todos os erros relatados nos arquivos de dados na sua máquina local.

  8. Carregue os arquivos fixos para o estágio de tabela ou para o estágio interno nomeado usando PUT.

    O seguinte exemplo carrega os arquivos para o estágio de tabela, substituindo os arquivos existentes:

    Linux ou macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. Carregue os dados na tabela de destino usando COPY INTO tabela sem a opção VALIDATION_MODE.

    Como alternativa, você pode usar a opção de cópia PURGE = TRUE para excluir os arquivos de dados do estágio quando os dados tiverem sido carregados com sucesso, ou apagar manualmente os arquivos do estágio de tabela usando REMOVE:

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

Etapa 2: analisar o arquivo de log do conector Kafka

Se a visualização COPY_HISTORY não tiver registro do carregamento de dados, analise o arquivo de log do conector Kafka. O conector grava os eventos no arquivo de log. Observe que o conector Kafka do Snowflake compartilha o mesmo arquivo de log com todos os plugins do conector Kafka. O nome e a localização desse arquivo de log deve estar no arquivo de configuração do Kafka Connect. Para obter mais informações, consulte a documentação do software Apache Kafka.

Pesquise mensagens de erro relacionadas ao Snowflake no arquivo de log do conector Kafka. A maioria das mensagens terá a cadeia de caracteres ERROR e conterá o nome de arquivo com.snowflake.kafka.connector... para facilitar a localização.

Possíveis erros que você pode encontrar:

Erro de configuração:

Possíveis causas do erro:

  • O conector não tem as informações adequadas para se inscrever no tópico.

  • O conector não tem as informações adequadas para gravar na tabela do Snowflake (por exemplo, o par de chaves de autenticação pode estar errado).

Observe que o conector Kafka valida seus parâmetros. O conector gera um erro para cada parâmetro de configuração incompatível. A mensagem de erro é gravada no arquivo de log do cluster do Kafka Connect. Se você suspeitar de um problema de configuração, verifique os erros nesse arquivo de log.

Erro de leitura:

O conector pode não ter conseguido ler a partir do Kafka pelas seguintes razões:

  • O Kafka ou Kafka Connect pode não estar em execução.

  • A mensagem pode ainda não ter sido enviada.

  • A mensagem pode ter sido excluída (expirou).

Erro de gravação (estágio):

Possíveis causas do erro:

  • Privilégios insuficientes no estágio.

  • O estágio está sem espaço.

  • O estágio foi descartado.

  • Algum outro usuário ou processo gravou arquivos inesperados no estágio.

Erro de gravação (tabela):

Possíveis causas do erro:

  • Privilégios insuficientes na tabela.

Etapa 3: Verificar o Kafka Connect

Se nenhum erro tiver sido relatado no arquivo de log do Kafka Connect, verifique o Kafka Connect. Para obter instruções de solução de problemas, consulte a documentação do fornecedor do software Apache Kafka.

Resolução de problemas específicos

Linhas duplicadas com o mesmo offset e a mesma partição de tópico

Ao carregar dados usando a versão 1.4 do conector Kafka (ou superior), as linhas duplicadas na tabela de destino com a mesma partição de tópico e offset podem indicar que a operação de carregamento excedeu o tempo de execução padrão de 300.000 milissegundos (300 segundos). Para confirmar a causa, pesquise o seguinte erro no arquivo de log do Kafka Connect:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

Para resolver o erro, no arquivo de configuração do Kafka (por exemplo, <dir_kafka>/config/connect-distributed.properties), altere uma das seguintes propriedades:

consumer.max.poll.interval.ms

Aumente o tempo de execução para 900000 (900 segundos).

consumer.max.poll.records

Diminua o número de registros carregados em cada operação para 50.

Código de erro de falha na resposta de migração de deslocamento do canal de streaming: 5023

Ao atualizar para a versão do conector v2.1.0 (ou superior), houve uma alteração introduzida no formato do nome do canal do Snowpipe Streaming. Como resultado, a lógica que detecta informações sobre deslocamentos previamente confirmados não encontrará nenhuma informação sobre os previamente confirmados. Isso se manifestará como a seguinte exceção:

com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException: [SF_KAFKA_CONNECTOR] Exception: Failure in Streaming Channel Offset Migration Response Error Code: 5023

Detail: Streaming Channel Offset Migration from Source to Destination Channel has no/invalid response, please contact Snowflake Support

Message: Snowflake experienced a transient exception, please retry the migration request.
Copy

Para resolver esse erro, no arquivo de configuração do Kafka (por exemplo, <kafka_dir>/config/connect-distributed.properties), adicione a seguinte propriedade de configuração:

enable.streaming.channel.offset.migration

Desabilite a migração automática de deslocamento configurando-a como false.

Configuração do conector para oferecer suporte a vários tópicos

Encontramos um problema com uma única instância do conector Kafka que aceita um grande número de tópicos, cada um com várias partições. A configuração do conector, embora parecesse válida, resultou em um ciclo de rebalanceamento infinito, sem possibilidade de ingestão de dados no Snowflake. O problema era específico do modo de ingestão do Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING), mas as diretrizes também são aplicáveis ao modo de ingestão do Snowpipe (snowflake.ingestion.method=SNOWPIPE). O problema se manifesta no arquivo de log registrando repetidamente esta mensagem de log:

[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed

Isso normalmente pode acontecer quando você configura seu conector para ingerir tópicos via regex. Recomendamos aplicar o seguinte conjunto de opções ao arquivo de configuração do Kafka (por exemplo, <kafka_dir>/config/connect-distributed.properties):

consumer.override.partition.assignment.strategy

Configure a estratégia de atribuição de partição para tarefas como org.apache.kafka.clients.consumer.CooperativeStickyAssignor, isso causará uma distribuição uniforme dos canais ingeridos para as tarefas disponíveis, reduzindo o risco de rebalanceamento.

tasks.max

O número de tarefas instanciadas por conector não deve exceder o número de CPUs disponíveis: o driver subjacente implementa um mecanismo de limitação com base nas CPUs disponíveis. O aumento do número de solicitações simultâneas aumentará a pressão de memória no seu sistema, mas também resultará em tempos de processamento de inserção mais longos, levando diretamente à perda de heartbeats do conector.

Ao falar sobre os valores de tempo limite do conector, há um conjunto de propriedades de configuração que os afetam diretamente:

consumer.override.heartbeat.interval.ms

Define com que frequência o thread do monitor (há um associado a cada tarefa) enviará o heartbeat para o Kafka. O padrão é 3000 ms, mas em caso de maior carga do sistema, você pode experimentar aumentá-la para 5000 ms.

consumer.override.session.timeout.ms

Define quanto tempo o broker esperará antes de presumir que o consumidor está em um estado inválido e tentar o rebalanceamento. Essa configuração normalmente deve ser 3 vezes maior que o intervalo de heartbeat, então se você configurou o heartbeat como 5000 ms, defina-o como 15000 ms.

consumer.override.max.poll.interval.ms

Define o intervalo máximo entre chamadas para poll() do Kafka subjacente. O tempo gasto entre as pesquisas basicamente é mapeado para o conector que processa o lote de dados (incluindo upload para o Snowflake e confirmação). Em cenários em que você tem várias tarefas processando dados, o Snowflake Connection subjacente pode começar a limitar as solicitações, resultando em tempos de processamento mais longos. Dependendo do seu cenário, você pode aumentar esse valor para até 20 minutos (1200000 ms), especialmente quando você inicia o conector com uma grande contagem inicial de registros a serem ingeridos.

consumer.override.rebalance.timeout.ms

Quando o rebalanceamento acontece, em um cenário com um grande número de canais por tarefa, há muita lógica subjacente por canal para descobrir onde retomar o processamento. Esse código é executado sequencialmente, portanto, quanto mais canais por tarefa, mais tempo durará a configuração inicial. Configure essa propriedade com um valor grande o suficiente para permitir que cada canal conclua sua inicialização. O valor de 3 minutos (180000 ms) é um bom ponto de partida.

Também é importante estar ciente da memória heap disponível para o conector. Isso é especialmente importante em cenários onde há vários conectores em execução simultaneamente ou um conector ingerindo dados de vários tópicos. A partição de cada tópico é mapeada para um único canal e, como tal, requer memória.

Certifique-se de ajustar as configurações de memória do processo de conexão do Kafka por meio da configuração Xmx. Uma maneira de fazer isso é configurar a variável de ambiente KAFKA_OPTS e defini-la adequadamente (ou seja, KAFKA_OPTS=-Xmx4G).

O limpador de arquivos está limpando arquivos inesperadamente

Ao usar o conector Kafka com o SNOWPIPE, você pode encontrar um problema ao ingerir dados em uma única tabela de vários tópicos. Se sua configuração não tiver a entrada snowflake.topic2table.map ou houver um mapeamento 1:1 entre o tópico e a tabela, esse problema não se aplica.

O conector Kafka está gerando arquivos com registros para serem enviados para um estágio. Esses arquivos são formatados de acordo com o seguinte padrão: snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz. O problema está no <partition-id>: se vários tópicos carregarem dados em uma única tabela, é provável que haja duplicatas no valor partition-id. Isso não é um problema na operação normal do conector. No entanto, se o conector for reiniciado ou rebalanceado, o processo de limpeza poderá associar incorretamente os arquivos carregados no estágio (mas ainda não ingeridos) à partição errada e decidir excluí-los, o que pode resultar em um evento de perda de dados.

O conector com a versão 2.4.x corrige esse problema incluindo o código hash do tópico de origem no partition-id para garantir nomes de arquivo exclusivos que correspondam exatamente à partição de um único tópico. Essa correção é habilitada por padrão — snowflake.snowpipe.stageFileNameExtensionEnabled — e afeta apenas configurações em que uma tabela de destino é listada mais de uma vez em snowflake.topic2table.map.

Se sua configuração for afetada por essa funcionalidade, você poderá acabar tendo arquivos desatualizados enviados para seu estágio. Quando o conector iniciar, ele verificará se seu estágio contém esses arquivos. Você precisa procurar as entradas de log que começam com NOTE: For table, seguidas pela lista de arquivos detectados.

Você também pode verificar manualmente se há alguns arquivos afetados no estágio:

  1. Encontrar o estágio afetado:

    show stages like 'snowflake_kafka_connector%<your table name>';
    
    Copy
  2. Listar os arquivos do estágio:

    list @<your stage name> pattern = '.+/<your-table-name>/[0-9]{1,4}/[0-9]+_[0-9]+_[0-9]+\.json\.gz$';
    
    Copy

O comando acima lista todos os arquivos que correspondem ao estágio da sua tabela e que têm IDs de partição no intervalo de 0 a 9999. Esses arquivos não serão mais ingeridos, então você pode baixá-los ou excluí-los.

Como relatar um problema

Ao entrar em contato com o suporte Snowflake para pedir ajuda, tenha os seguintes arquivos disponíveis:

  • Arquivo de configuração do conector Kafka.

    Importante

    Remova a chave privada antes de fornecer o arquivo ao Snowflake.

  • Cópia do log do conector Kafka. Certifique-se de que o arquivo não contenha informações confidenciais ou sensíveis.

  • Arquivo de log do JDBC.

    Para gerar o arquivo de log, defina a variável de ambiente JDBC_TRACE = true no cluster do Kafka Connect antes de executar o conector Kafka.

    Para obter mais informações sobre o arquivo de log do JDBC, consulte este artigo na Comunidade Snowflake.

  • Arquivo de log do Connect.

    Para gerar o arquivo de log, edite o arquivo etc/kafka/connect-log4j.properties. Defina a propriedade log4j.appender.stdout.layout.ConversionPattern da seguinte forma:

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    Os contextos dos conectores estão disponíveis na versão Kafka 2.3 ou superior.

    Para obter mais informações, consulte Logging Improvements no site do Confluent.