Troubleshooting the Kafka Connector¶
This section describes how to troubleshoot issues encountered while ingesting data using the Kafka connector.
In this Topic:
This section describes a methodical approach to troubleshooting issues with loads using the Kafka connector.
Step 1. View the COPY History for the Table¶
Query the load activity history for the target table. For information, see COPY_HISTORY View. If the COPY_HISTORY output does not include a set of expected files, query an earlier time period. If the files were duplicates of earlier files, the load history might have recorded the activity when the attempt to load the original files was made.
STATUS column indicates whether a particular set of files was loaded, partially loaded, or failed to load. The
FIRST_ERROR_MESSAGE column provides a reason when an attempt partially loaded or failed.
Validating Data Files and Resolving Errors¶
If Snowpipe could not load data from files in the internal stage created for the Kafka topic, the Kafka connector moves the files to the special stage associated with the target table. The syntax for referencing a table stage is
If a set of files has multiple issues, the
FIRST_ERROR_MESSAGE column in the COPY_HISTORY output only indicates the first error encountered. To view all errors in the files, execute a COPY INTO <table> statement with the VALIDATION_MODE copy option set to
RETURN_ALL_ERRORS. The VALIDATION_MODE copy option instructs a COPY statement to validate the data to be loaded and return results based on the validation option specified. No data is loaded when this copy option is specified. In the statement, reference the set of files you had attempted to load using the Kafka connector.
When any issues with the data files are resolved, you can load the data manually using one or more COPY statements.
The following example references data files located in the table stage for the
mytable table in the
mydb.public database and schema.
To validate data files in the table stage and resolve errors:
List all files located in the table stage (using LIST):
LIST @mydb.public.%mytable; +-----------------+------+----------------------------------+-------------------------------+ | name | size | md5 | last_modified | |-----------------+------+----------------------------------+-------------------------------| | myfile.csv.gz | 512 | a123cdef1234567890abcdefghijk123 | Tue, 22 Oct 2019 14:20:31 GMT | +-----------------+------+----------------------------------+-------------------------------+
Retrieve all errors encountered in the file (using COPY INTO table … VALIDATION_MODE = ‘RETURN_ALL_ERRORS’):
COPY INTO mydb.public.mytable FROM @mydb.public.%mytable VALIDATION_MODE = 'RETURN_ALL_ERRORS';
Download the data files from the table stage to a local directory (using GET):
GET @mydb.public.%mytable file:///tmp/;
GET @mydb.public.%mytable file://C:\temp\;
Fix all errors in the data files.
Stage the files to either the table stage or the named internal stage for the Kafka topic (using PUT). In this example, we are staging the files to the table stage, overwriting the existing files:
PUT file:///tmp/myfile.csv @mydb.public.%mytable;
PUT file://C:\temp\myfile.csv @mydb.public.%mytable;
Load the data into the target table (using COPY INTO table without the VALIDATION_MODE option). You can optionally use the PURGE = TRUE copy option to delete the data files from the stage once the data is loaded successfully, or manually delete the files from the table stage (using REMOVE):
COPY INTO mydb.public.mytable FROM @mydb.public.%mytable PURGE = TRUE;
Step 2: Analyze the Kafka Connector Log File¶
If the COPY_HISTORY view has no record of the data load, then analyze the log file for the Kafka connector. The connector writes events to the log file. Note that the Snowflake Kafka connector shares the same log file with all Kafka connector plugins. The name and location of this log file should be in your Kafka Connect configuration file. For more information, see the documentation provided for your Apache Kafka software.
Search the Kafka connector log file for Snowflake-related error messages. Most messages will have the string
ERROR and will contain the file name
com.snowflake.kafka.connector... to make these messages easier to find.
Possible errors that you might encounter include:
- Configuration error
Possible causes of the error:
The connector doesn’t have the proper information to subscribe to the topic.
The connector doesn’t have the proper information to write to the Snowflake table (e.g. the key pair for authentication might be wrong).
Note that the Kafka connector validates its parameters. The connector throws an error for each incompatible configuration parameter. The error message is written to the Kafka Connect cluster’s log file. If you suspect a configuration problem, check the errors in that log file.
- Read error
The connector might not have been able to read from Kafka for the following reasons:
Kafka or Kafka Connect might not be running.
The message might not have been sent yet.
The message might have been deleted (expired).
- Write error (stage)
Possible causes of the error:
Insufficient privileges on the stage.
Stage is out of space.
Stage was dropped.
Some other user or process wrote unexpected files to the stage.
- Write error (table)
Possible causes of the error:
Insufficient privileges on the table.
Step 3: Check Kafka Connect¶
If no error is reported in the Kafka connect log file, check Kafka Connect. For troubleshooting instructions, see the documentation provided by your Apache Kafka software vendor.
Resolving Specific Issues¶
Duplicate Rows with the Same Topic Partition and Offset¶
When loading data using version 1.4 of the Kafka connector (or higher), duplicate rows in the target table with the same topic partition and offset can indicate that the load operation exceeded the default execution timeout of 300000 milliseconds (300 seconds). To verify the cause, check the Kafka Connect log file for the following error:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
To resolve the error, in the Kafka configuration file (e.g.
<kafka_dir>/config/connect-distributed.properties), change either of the following properties:
Increase the execution timeout to
Decrease the number of records loaded with each operation to
When contacting Snowflake Support for assistance, please have the following files available:
Configuration file for your Kafka connector.
Remove the private key before providing the file to Snowflake.
Copy of the Kafka Connector log. Ensure that the file does not contain confidential or sensitive information.