Utiliser Snowflake Connector for Kafka avec les tables Apache Iceberg™

À partir de la version 3.0.0, Snowflake Connector for Kafka peut ingérer des données dans une table Apache Iceberg™ gérée par Snowflake.

Exigences et limitations

Avant de configurer le connecteur Kafka pour l’ingestion de tables Iceberg, notez les exigences et limites suivantes :

  • L’ingestion de tables Iceberg exige la version 3.0.0 ou ultérieure du connecteur Kafka.

  • L’ingestion de tables Iceberg est prise en charge par le connecteur Kafka avec Snowpipe Streaming. Elle n’est pas prise en charge par le connecteur Kafka avec Snowpipe.

  • L’ingestion de tables Iceberg n’est pas prise en charge lorsque snowflake.streaming.enable.single.buffer est défini sur false.

  • Vous devez créer une table Iceberg avant d’exécuter le connecteur. Pour plus d’informations, voir Configuration et définition dans cette rubrique.

Limites de l’évolution des schémas

L’évolution des schémas pour Iceberg est entièrement prise en charge pour les formats de données schématisés tels que AVRO ou Protobuf.

Pour un JSON brut sans schéma, le connecteur considère les types de messages suivants comme invalides et les envoie dans des files d’attente de lettres mortes (DLQ) :

  • Messages avec une nouvelle colonne si la valeur correspondante est null ou []

  • Messages avec un nouveau champ dans un objet structuré si la valeur correspondante est null ou []

Pour modifier manuellement le schéma de la table afin que le connecteur puisse ingérer ces types de messages, utilisez une instruction ALTER TABLE.

Configuration et définition

Pour configurer le connecteur Kafka pour l’ingestion de tables Iceberg, vous suivez les étapes de définition habituelles pour un connecteur basé sur Snowpipe Streaming avec quelques différences notées dans les sections suivantes.

Accorder l’utilisation sur un volume externe

Vous devez accorder le privilège USAGE sur le volume externe associé à votre table Iceberg à votre rôle pour le connecteur Kafka.

Par exemple, si votre table Iceberg utilise le volume externe kafka_external_volume et que le connecteur utilise le rôle kafka_connector_role, exécutez l’instruction suivante :

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE kafka_connector_role;
Copy

Créer une table Iceberg pour l’ingestion

Avant d’exécuter le connecteur, vous devez créer une table Iceberg. Le schéma de table initial dépend des paramètres de votre connecteur snowflake.enable.schematization.

Si vous activez la schématisation, vous pouvez créer une table avec une colonne nommée record_metadata :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Le connecteur crée automatiquement la colonne record_content et modifie le schéma de la colonne record_metadata.

Si vous n’activez pas la schématisation, vous pouvez créer une table avec une colonne nommée record_content d’un type correspondant au contenu réel du message Kafka. Le connecteur crée automatiquement la colonne record_metadata.

Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg ou des types Snowflake compatibles. Le type semi-structuré VARIANT n’est pas pris en charge. Utilisez plutôt un OBJECT ou MAP structuré.

Prenons l’exemple du message suivant :

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

Pour créer une table Iceberg pour le message d’exemple, utilisez l’instruction suivante :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    record_content OBJECT(
        id INT,
        body_temperature FLOAT,
        name STRING,
        approved_coffee_types ARRAY(STRING),
        animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
        date_added DATE
    )
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Note

Les noms de champ à l’intérieur de structures imbriquées telles que dogs ou cats sont sensibles à la casse.

Propriétés de configuration

snowflake.streaming.iceberg.enabled

Spécifie si le connecteur ingère des données dans une table Iceberg. Le connecteur échoue si cette propriété ne correspond pas au type de table.

Valeurs:

  • true

  • false

Par défaut:

false