Solução de problemas do conector Kafka¶
Esta seção descreve como solucionar problemas encontrados durante a ingestão de dados usando o conector Kafka.
Notificações de erro¶
Configure notificações de erro do Snowpipe. Quando o Snowpipe encontra erros de arquivo durante uma carga, o recurso envia uma notificação para um serviço de mensagens em nuvem configurado, permitindo a análise de seus arquivos de dados. Para obter mais informações, consulte Notificações de erro do Snowpipe.
Etapas gerais de solução de problemas¶
Complete as etapas a seguir para solucionar problemas de carregamento usando o conector Kafka.
Etapa 1: ver o histórico de COPY da tabela¶
Consulte o histórico de atividade de carregamento da tabela de destino. Para obter mais informações, consulte Exibição COPY_HISTORY. Se a saída COPY_HISTORY não incluir um conjunto de arquivos esperados, consulte um período anterior. Se os arquivos fossem duplicatas de arquivos anteriores, o histórico de carregamento pode ter registrado a atividade quando foi feita a tentativa de carregar os arquivos originais. A coluna STATUS indica se um determinado conjunto de arquivos foi carregado, parcialmente carregado, ou não foi carregado. A coluna FIRST_ERROR_MESSAGE apresenta uma razão quando uma tentativa é parcialmente carregada ou quando não há carregamento.
O conector Kafka move os arquivos que não conseguiu carregar para o estágio associado à tabela de destino. A sintaxe para referenciar o estágio da tabela é @[namespace.]%table_name.
Liste todos os arquivos localizados no estágio da tabela usando LIST.
Por exemplo:
Os nomes dos arquivos estão em um dos seguintes formatos. As condições que produzem cada formato estão descritas na tabela:
Tipo de arquivo |
Descrição |
|---|---|
Bytes brutos |
Estes arquivos correspondem ao seguinte padrão:
Para esses arquivos, não foi possível converter os registros do Kafka de bytes brutos para o formato do arquivo de origem (Avro, JSON ou Protobuf). Uma causa comum para esse problema é uma falha de rede que resulta no descarte de um caractere do registro. O conector Kafka não conseguiu concluir a análise dos bytes brutos, resultando em um registro corrompido. |
Formato do arquivo de origem (Avro, JSON ou Protobuf) |
Estes arquivos correspondem ao seguinte padrão:
Para esses arquivos, depois que o conector Kafka converteu os bytes brutos de volta ao formato do arquivo de origem, o Snowpipe encontrou um erro e não foi possível carregar o arquivo. |
As seções seguintes fornecem instruções para a resolução de problemas relativos a cada um dos tipos de arquivo:
Bytes brutos¶
O nome de arquivo <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz inclui o offset exato do registro que não foi convertido de bytes brutos para o formato do arquivo de origem. Para resolver os problemas, reenvie o registro para o conector Kafka como um novo registro.
Formato do arquivo de origem (Avro, JSON ou Protobuf)¶
Se o Snowpipe não tiver conseguido carregar os dados dos arquivos no estágio interno criado para o tópico Kafka, o conector Kafka move os arquivos para o estágio da tabela de destino no formato do arquivo de origem.
Se um conjunto de arquivos tiver vários problemas, a coluna FIRST_ERROR_MESSAGE na saída COPY_HISTORY indica apenas o primeiro erro encontrado. Para visualizar todos os erros nos arquivos, é necessário recuperar os arquivos do estágio de tabela, carregá-los para um estágio nomeado, e depois executar uma instrução COPY INTO <tabela> com a opção de cópia VALIDATION_MODE definida como RETURN_ALL_ERRORS. A opção de cópia VALIDATION_MODE instrui uma instrução COPY para validar os dados a serem carregados e retornar resultados com base na opção de validação especificada. Nenhum dado é carregado quando essa opção de cópia é especificada. Na instrução, referencie o conjunto de arquivos que você tentou carregar usando o conector Kafka.
Quando o problema com os arquivos de dados for resolvido, você pode carregar os dados manualmente usando uma ou mais instruções COPY.
Os exemplos a seguir referenciam arquivos de dados localizados no estágio de tabela da tabela mytable no banco de dados e esquema mydb.public.
Para validar arquivos de dados no estágio de tabela e resolver erros:
Liste todos os arquivos localizados no estágio da tabela usando LIST.
Por exemplo:
Os exemplos nesta seção presumem que JSON é o formato dos arquivos de dados de origem.
Faça o download dos arquivos criados pelo conector Kafka na sua máquina local usando GET.
Por exemplo, baixe os arquivos para um diretório chamado
datana sua máquina local:- Linux ou macOS:
- Microsoft Windows:
Usando CREATE STAGE, crie um estágio interno nomeado que armazene arquivos de dados com o mesmo formato do arquivos Kafka de origem.
Por exemplo, crie um estágio interno chamado
kafka_jsonque armazene arquivos JSON:Carregue os arquivos que você baixou do estágio de tabela usando PUT.
Por exemplo, carregue os arquivos baixados para o diretório
dataem sua máquina local:- Linux ou macOS:
- Microsoft Windows:
Crie uma tabela temporária com duas colunas de variantes para fins de teste. A tabela é usada apenas para validar o arquivo de dados preparado. Nenhum dado é carregado na tabela. A tabela é descartada automaticamente quando a sessão atual do usuário termina:
Recupere todos os erros encontrados no arquivo de dados executando uma instrução COPY INTO *tabela* … VALIDATION_MODE = “RETURN_ALL_ERRORS”. A instrução valida o arquivo no estágio especificado. Nenhum dado é carregado na tabela:
Corrija todos os erros relatados nos arquivos de dados na sua máquina local.
Carregue os arquivos fixos para o estágio de tabela ou para o estágio interno nomeado usando PUT.
O seguinte exemplo carrega os arquivos para o estágio de tabela, substituindo os arquivos existentes:
- Linux ou macOS:
- Windows:
Carregue os dados na tabela de destino usando COPY INTO tabela sem a opção VALIDATION_MODE.
Como alternativa, você pode usar a opção de cópia PURGE = TRUE para excluir os arquivos de dados do estágio quando os dados tiverem sido carregados com sucesso, ou apagar manualmente os arquivos do estágio de tabela usando REMOVE:
Etapa 2: analisar o arquivo de log do conector Kafka¶
Se a visualização COPY_HISTORY não tiver registro do carregamento de dados, analise o arquivo de log do conector Kafka. O conector grava os eventos no arquivo de log. Observe que o conector Kafka do Snowflake compartilha o mesmo arquivo de log com todos os plugins do conector Kafka. O nome e a localização desse arquivo de log deve estar no arquivo de configuração do Kafka Connect. Para obter mais informações, consulte a documentação do software Apache Kafka.
Pesquise mensagens de erro relacionadas ao Snowflake no arquivo de log do conector Kafka. A maioria das mensagens terá a cadeia de caracteres ERROR e conterá o nome de arquivo com.snowflake.kafka.connector... para facilitar a localização.
Possíveis erros que você pode encontrar:
- Erro de configuração:
Possíveis causas do erro:
O conector não tem as informações adequadas para se inscrever no tópico.
O conector não tem as informações adequadas para gravar na tabela do Snowflake (por exemplo, o par de chaves de autenticação pode estar errado).
Observe que o conector Kafka valida seus parâmetros. O conector gera um erro para cada parâmetro de configuração incompatível. A mensagem de erro é gravada no arquivo de log do cluster do Kafka Connect. Se você suspeitar de um problema de configuração, verifique os erros nesse arquivo de log.
- Erro de leitura:
O conector pode não ter conseguido ler a partir do Kafka pelas seguintes razões:
O Kafka ou Kafka Connect pode não estar em execução.
A mensagem pode ainda não ter sido enviada.
A mensagem pode ter sido excluída (expirou).
- Erro de gravação (estágio):
Possíveis causas do erro:
Privilégios insuficientes no estágio.
O estágio está sem espaço.
O estágio foi descartado.
Algum outro usuário ou processo gravou arquivos inesperados no estágio.
- Erro de gravação (tabela):
Possíveis causas do erro:
Privilégios insuficientes na tabela.
Etapa 3: Verificar o Kafka Connect¶
Se nenhum erro tiver sido relatado no arquivo de log do Kafka Connect, verifique o Kafka Connect. Para obter instruções de solução de problemas, consulte a documentação do fornecedor do software Apache Kafka.
Resolução de problemas específicos¶
Linhas duplicadas com o mesmo offset e a mesma partição de tópico¶
Ao carregar dados usando a versão 1.4 do conector Kafka (ou superior), as linhas duplicadas na tabela de destino com a mesma partição de tópico e offset podem indicar que a operação de carregamento excedeu o tempo de execução padrão de 300.000 milissegundos (300 segundos). Para confirmar a causa, pesquise o seguinte erro no arquivo de log do Kafka Connect:
Para resolver o erro, no arquivo de configuração do Kafka (por exemplo, <dir_kafka>/config/connect-distributed.properties), altere uma das seguintes propriedades:
consumer.max.poll.interval.msAumente o tempo de execução para
900000(900 segundos).consumer.max.poll.recordsDiminua o número de registros carregados em cada operação para
50.
Código de erro de falha na resposta de migração de deslocamento do canal de streaming: 5023¶
Ao atualizar para a versão do conector v2.1.0 (ou superior), houve uma alteração introduzida no formato do nome do canal do Snowpipe Streaming. Como resultado, a lógica que detecta informações sobre deslocamentos previamente confirmados não encontrará nenhuma informação sobre os previamente confirmados. Isso se manifestará como a seguinte exceção:
Para resolver esse erro, no arquivo de configuração do Kafka (por exemplo, <kafka_dir>/config/connect-distributed.properties), adicione a seguinte propriedade de configuração:
enable.streaming.channel.offset.migrationDesabilite a migração automática de deslocamento configurando-a como
false.
Configuração do conector para oferecer suporte a vários tópicos¶
Encontramos um problema com uma única instância do conector Kafka que aceita um grande número de tópicos, cada um com várias partições. A configuração do conector, embora parecesse válida, resultou em um ciclo de rebalanceamento infinito, sem possibilidade de ingestão de dados no Snowflake. O problema era específico do modo de ingestão do Snowpipe Streaming (snowflake.ingestion.method=SNOWPIPE_STREAMING), mas as diretrizes também são aplicáveis ao modo de ingestão do Snowpipe (snowflake.ingestion.method=SNOWPIPE). O problema se manifesta no arquivo de log registrando repetidamente esta mensagem de log:
[Worker-xyz] [timestamp] INFO [my-connector|task-id] [SF_INGEST] Channel is marked as closed
Isso normalmente pode acontecer quando você configura seu conector para ingerir tópicos via regex. Recomendamos aplicar o seguinte conjunto de opções ao arquivo de configuração do Kafka (por exemplo, <kafka_dir>/config/connect-distributed.properties):
consumer.override.partition.assignment.strategyConfigure a estratégia de atribuição de partição para tarefas como
org.apache.kafka.clients.consumer.CooperativeStickyAssignor, isso causará uma distribuição uniforme dos canais ingeridos para as tarefas disponíveis, reduzindo o risco de rebalanceamento. Observe queCooperativeStickyAssignorrequer o Kafka Connect versão 3.0.1 ou posterior devido a esse problema conhecido.tasks.maxO número de tarefas instanciadas por conector não deve exceder o número de CPUs disponíveis: o driver subjacente implementa um mecanismo de limitação com base nas CPUs disponíveis. O aumento do número de solicitações simultâneas aumentará a pressão de memória no seu sistema, mas também resultará em tempos de processamento de inserção mais longos, levando diretamente à perda de heartbeats do conector.
Ao falar sobre os valores de tempo limite do conector, há um conjunto de propriedades de configuração que os afetam diretamente:
consumer.override.heartbeat.interval.msDefine com que frequência o thread do monitor (há um associado a cada tarefa) enviará o heartbeat para o Kafka. O padrão é
3000ms, mas em caso de maior carga do sistema, você pode experimentar aumentá-la para5000ms.consumer.override.session.timeout.msDefine quanto tempo o broker esperará antes de presumir que o consumidor está em um estado inválido e tentar o rebalanceamento. Essa configuração normalmente deve ser 3 vezes maior que o intervalo de heartbeat, então se você configurou o heartbeat como
5000ms, defina-o como15000ms.consumer.override.max.poll.interval.msDefine o intervalo máximo entre chamadas para
poll()do Kafka subjacente. O tempo gasto entre as pesquisas basicamente é mapeado para o conector que processa o lote de dados (incluindo upload para o Snowflake e confirmação). Em cenários em que você tem várias tarefas processando dados, o Snowflake Connection subjacente pode começar a limitar as solicitações, resultando em tempos de processamento mais longos. Dependendo do seu cenário, você pode aumentar esse valor para até 20 minutos (1200000ms), especialmente quando você inicia o conector com uma grande contagem inicial de registros a serem ingeridos.consumer.override.rebalance.timeout.msQuando o rebalanceamento acontece, em um cenário com um grande número de canais por tarefa, há muita lógica subjacente por canal para descobrir onde retomar o processamento. Esse código é executado sequencialmente, portanto, quanto mais canais por tarefa, mais tempo durará a configuração inicial. Configure essa propriedade com um valor grande o suficiente para permitir que cada canal conclua sua inicialização. O valor de 3 minutos (
180000ms) é um bom ponto de partida.
Também é importante estar ciente da memória heap disponível para o conector. Isso é especialmente importante em cenários onde há vários conectores em execução simultaneamente ou um conector ingerindo dados de vários tópicos. A partição de cada tópico é mapeada para um único canal e, como tal, requer memória.
Certifique-se de ajustar as configurações de memória do processo de conexão do Kafka por meio da configuração Xmx. Uma maneira de fazer isso é configurar a variável de ambiente KAFKA_OPTS e defini-la adequadamente (ou seja, KAFKA_OPTS=-Xmx4G).
O limpador de arquivos está limpando arquivos inesperadamente¶
Ao usar o conector Kafka com o SNOWPIPE, você pode encontrar um problema ao ingerir dados em uma única tabela de vários tópicos. Se sua configuração não tiver a entrada snowflake.topic2table.map ou houver um mapeamento 1:1 entre o tópico e a tabela, esse problema não se aplica.
O conector Kafka está gerando arquivos com registros para serem enviados para um estágio. Esses arquivos são formatados de acordo com o seguinte padrão: snowflake_kafka_connector_<connector-name>_stage_<table-name>/<connector-name>/<table-name>/<partition-id>/<low-watermark>_<high-watermark>_<timestamp>.json.gz. O problema está no <partition-id>: se vários tópicos carregarem dados em uma única tabela, é provável que haja duplicatas no valor partition-id. Isso não é um problema na operação normal do conector. No entanto, se o conector for reiniciado ou rebalanceado, o processo de limpeza poderá associar incorretamente os arquivos carregados no estágio (mas ainda não ingeridos) à partição errada e decidir excluí-los, o que pode resultar em um evento de perda de dados.
O conector com a versão 2.5.0 corrige esse problema incluindo o código hash do tópico de origem em partition-id para garantir nomes de arquivos exclusivos que correspondam exatamente à partição de um único tópico. Essa correção é habilitada por padrão — snowflake.snowpipe.stageFileNameExtensionEnabled — e afeta apenas configurações em que uma tabela de destino é listada mais de uma vez em snowflake.topic2table.map.
Se sua configuração for afetada por essa funcionalidade, você poderá acabar tendo arquivos desatualizados enviados para seu estágio. Quando o conector iniciar, ele verificará se seu estágio contém esses arquivos. Você precisa procurar as entradas de log que começam com NOTE: For table, seguidas pela lista de arquivos detectados.
Você também pode verificar manualmente se há alguns arquivos afetados no estágio:
Encontrar o estágio afetado:
Listar os arquivos do estágio:
O comando acima lista todos os arquivos que correspondem ao estágio da sua tabela e que têm IDs de partição no intervalo de 0 a 9999. Esses arquivos não serão mais ingeridos, então você pode baixá-los ou excluí-los.
Como relatar um problema¶
Ao entrar em contato com o suporte Snowflake para pedir ajuda, tenha os seguintes arquivos disponíveis:
Arquivo de configuração do conector Kafka.
Importante
Remova a chave privada antes de fornecer o arquivo ao Snowflake.
Cópia do log do conector Kafka. Certifique-se de que o arquivo não contenha informações confidenciais ou sensíveis.
Arquivo de log do JDBC.
Para gerar o arquivo de log, defina a variável de ambiente
JDBC_TRACE = trueno cluster do Kafka Connect antes de executar o conector Kafka.Para obter mais informações sobre o arquivo de log do JDBC, consulte este artigo na Comunidade Snowflake.
Arquivo de log do Connect.
Para gerar o arquivo de log, edite o arquivo
etc/kafka/connect-log4j.properties. Defina a propriedadelog4j.appender.stdout.layout.ConversionPatternda seguinte forma:log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%nOs contextos dos conectores estão disponíveis na versão Kafka 2.3 ou superior.
Para obter mais informações, consulte Logging Improvements no site do Confluent.