Troubleshooting the Kafka connector

This section describes how to troubleshoot issues encountered while ingesting data using the Kafka connector.

Error notifications

Configure error notifications for Snowpipe. When Snowpipe encounters file errors during a load, the feature pushes a notification to a configured cloud messaging service, enabling analysis of your data files. For more information, see Snowpipe error notifications.

General troubleshooting steps

Complete the following steps to troubleshoot issues with loads using the Kafka connector.

Step 1: View the COPY history for the table

Query the load activity history for the target table. For information, see COPY_HISTORY View. If the COPY_HISTORY output does not include a set of expected files, query an earlier time period. If the files were duplicates of earlier files, the load history might have recorded the activity when the attempt to load the original files was made. The STATUS column indicates whether a particular set of files was loaded, partially loaded, or failed to load. The FIRST_ERROR_MESSAGE column provides a reason when an attempt partially loaded or failed.

The Kafka connector moves files it could not load to the stage associated with the target table. The syntax for referencing a table stage is @[namespace.]%table_name.

List all files located in the table stage using LIST.

For example:

LIST @mydb.public.%mytable;
Copy

File names are in one of the following formats. The conditions that produce each format are described in the table:

File Type

Description

Raw bytes

These files match the following pattern:

<connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz

For these files, the Kafka records could not be converted from raw bytes to the source file format (Avro, JSON, or Protobuf).

A common cause for this issue is a network failure that resulted in a character getting dropped from the record. The Kafka connector could no longer parse the raw bytes, resulting in a broken record.

Source file format (Avro, JSON, or Protobuf)

These files match the following pattern:

<connector_name>/<table_name>/<partition>/<start_offset>_<end_offset>_<timestamp>.<file_type>.gz

For these files, after the Kafka connector converted the raw bytes back to the source file format, Snowpipe encountered an error and could not load the file.

The following sections provide instructions for resolving issues with each of the file types:

Raw bytes

The filename <connector_name>/<table_name>/<partition>/offset_(<key>/<value>_)<timestamp>.gz includes the exact offset of the record that was not converted from raw bytes to the source file format. To resolve issues, resend the record to the Kafka connector as a new record.

Source file format (Avro, JSON, or protobuf)

If Snowpipe could not load data from files in the internal stage created for the Kafka topic, the Kafka connector moves the files to the stage for the target table in the source file format.

If a set of files has multiple issues, the FIRST_ERROR_MESSAGE column in the COPY_HISTORY output only indicates the first error encountered. To view all errors in the files, it is necessary to retrieve the files from the table stage, upload them to a named stage, and then execute a COPY INTO <table> statement with the VALIDATION_MODE copy option set to RETURN_ALL_ERRORS. The VALIDATION_MODE copy option instructs a COPY statement to validate the data to be loaded and return results based on the validation option specified. No data is loaded when this copy option is specified. In the statement, reference the set of files you had attempted to load using the Kafka connector.

When any issues with the data files are resolved, you can load the data manually using one or more COPY statements.

The following example references data files located in the table stage for the mytable table in the mydb.public database and schema.

To validate data files in the table stage and resolve errors:

  1. List all files located in the table stage using LIST.

    For example:

    LIST @mydb.public.%mytable;
    
    Copy

    The examples in this section presume that JSON is the source format for the data files.

  2. Download the files created by Kafka connector to your local machine using GET.

    For example, download the files to a directory named data on your local machine:

    Linux or macOS:
    GET @mydb.public.%mytable file:///data/;
    
    Copy
    Microsoft Windows:
    GET @mydb.public.%mytable file://C:\data\;
    
    Copy
  3. Create a named internal stage using CREATE STAGE that stores data files with the same format as your source Kafka files.

    For example, create a internal stage named kafka_json that stores JSON files:

    CREATE STAGE kafka_json FILE_FORMAT = (TYPE = JSON);
    
    Copy
  4. Upload the files you downloaded from the table stage using PUT.

    For example, upload the files downloaded to the data directory on your local machine:

    Linux or macOS:
    PUT file:///data/ @mydb.public.kafka_json;
    
    Copy
    Microsoft Windows:
    PUT file://C:\data\ @mydb.public.kafka_json;
    
    Copy
  5. Create a temporary table with two variant columns for testing purposes. The table is only used to validate staged data file. No data is loaded into the table. The table is dropped automatically when the current user session ends:

    CREATE TEMPORARY TABLE t1 (col1 variant);
    
    Copy
  6. Retrieve all errors encountered in the data file by executing a COPY INTO *table* … VALIDATION_MODE = ‘RETURN_ALL_ERRORS’ statement. The statement validates the file in the specified stage. No data is loaded into the table:

    COPY INTO mydb.public.t1
      FROM @mydb.public.kafka_json
      FILE_FORMAT = (TYPE = JSON)
      VALIDATION_MODE = 'RETURN_ALL_ERRORS';
    
    Copy
  7. Fix all reported errors in the data files on your local machine.

  8. Upload the fixed files to either the table stage or the named internal stage using PUT.

    The following example uploads the files to the table stage, overwriting the existing files:

    Linux or macOS:
    PUT file:///tmp/myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
    Windows:
    PUT file://C:\temp\myfile.csv @mydb.public.%mytable OVERWRITE = TRUE;
    
    Copy
  9. Load the data into the target table using COPY INTO table without the VALIDATION_MODE option.

    You can optionally use the PURGE = TRUE copy option to delete the data files from the stage once the data is loaded successfully, or manually delete the files from the table stage using REMOVE:

    COPY INTO mydb.public.mytable(RECORD_METADATA, RECORD_CONTENT)
      FROM (SELECT $1:meta, $1:content FROM @mydb.public.%mytable)
      FILE_FORMAT = (TYPE = 'JSON')
      PURGE = TRUE;
    
    Copy

Step 2: Analyze the Kafka connector log file

If the COPY_HISTORY view has no record of the data load, then analyze the log file for the Kafka connector. The connector writes events to the log file. Note that the Snowflake Kafka connector shares the same log file with all Kafka connector plugins. The name and location of this log file should be in your Kafka Connect configuration file. For more information, see the documentation provided for your Apache Kafka software.

Search the Kafka connector log file for Snowflake-related error messages. Most messages will have the string ERROR and will contain the file name com.snowflake.kafka.connector... to make these messages easier to find.

Possible errors that you might encounter include:

Configuration error:

Possible causes of the error:

  • The connector doesn’t have the proper information to subscribe to the topic.

  • The connector doesn’t have the proper information to write to the Snowflake table (e.g. the key pair for authentication might be wrong).

Note that the Kafka connector validates its parameters. The connector throws an error for each incompatible configuration parameter. The error message is written to the Kafka Connect cluster’s log file. If you suspect a configuration problem, check the errors in that log file.

Read error:

The connector might not have been able to read from Kafka for the following reasons:

  • Kafka or Kafka Connect might not be running.

  • The message might not have been sent yet.

  • The message might have been deleted (expired).

Write error (stage):

Possible causes of the error:

  • Insufficient privileges on the stage.

  • Stage is out of space.

  • Stage was dropped.

  • Some other user or process wrote unexpected files to the stage.

Write error (table):

Possible causes of the error:

  • Insufficient privileges on the table.

Step 3: Check Kafka Connect

If no error is reported in the Kafka connect log file, check Kafka Connect. For troubleshooting instructions, see the documentation provided by your Apache Kafka software vendor.

Resolving specific issues

Duplicate rows with the same topic partition and offset

When loading data using version 1.4 of the Kafka connector (or higher), duplicate rows in the target table with the same topic partition and offset can indicate that the load operation exceeded the default execution timeout of 300000 milliseconds (300 seconds). To verify the cause, check the Kafka Connect log file for the following error:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.

This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
Copy

To resolve the error, in the Kafka configuration file (e.g. <kafka_dir>/config/connect-distributed.properties), change either of the following properties:

consumer.max.poll.interval.ms

Increase the execution timeout to 900000 (900 seconds).

consumer.max.poll.records

Decrease the number of records loaded with each operation to 50.

Reporting issues

When contacting Snowflake Support for assistance, please have the following files available:

  • Configuration file for your Kafka connector.

    Important

    Remove the private key before providing the file to Snowflake.

  • Copy of the Kafka Connector log. Ensure that the file does not contain confidential or sensitive information.

  • JDBC log file.

    To generate the log file, set the JDBC_TRACE = true environment variable on your Kafka Connect cluster before you run the Kafka connector.

    For more information about the JDBC log file, see this article in the Snowflake Community.

  • Connect log file.

    To produce the log file, edit the etc/kafka/connect-log4j.properties file. Set the log4j.appender.stdout.layout.ConversionPattern property as follows:

    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n

    Connector contexts are available in Kafka version 2.3 and higher.

    For more information, see the Logging Improvements information on the Confluent website.