Détection et évolution de schémas du connecteur Kafka avec Snowpipe Streaming

Le connecteur Kafka avec Snowpipe Streaming prend en charge la détection et l’évolution de schémas. La structure des tables dans Snowflake peut être définie et peut évoluer automatiquement pour prendre en charge la structure des nouvelles données Snowpipe Streaming chargée par le connecteur Kafka.

Sans détection et évolution des schémas, la table Snowflake chargée par le connecteur Kafka ne comporte que deux colonnes VARIANT, RECORD_CONTENT et RECORD_METADATA. Lorsque la détection et l’évolution des schémas sont activées, Snowflake peut détecter le schéma des données en continu et charger les données dans des tables qui correspondent automatiquement à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.

Note

Cette fonction ne fonctionne qu’avec le connecteur Kafka avec Snowpipe Streaming. Actuellement, elle ne prend pas en charge le connecteur Kafka avec Snowpipe basé sur les fichiers.

Dans ce chapitre :

Conditions préalables

Avant d’activer cette fonction, veillez à mettre en place les conditions préalables suivantes.

Configuration des propriétés requises pour Kafka

Configurez les propriétés requises suivantes dans le fichier de propriétés de votre connecteur Kafka :

snowflake.ingestion.method

Indiquez d’utiliser SNOWPIPE_STREAMING pour charger les données de votre sujet Kafka. Notez que cette fonction ne prend pas en charge SNOWPIPE.

snowflake.enable.schematization

Spécifiez TRUE pour activer la détection et l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming. La valeur par défaut est FALSE.

Lorsque cette propriété est définie sur TRUE,

  • Pour toute nouvelle table créée par le connecteur Kafka, le paramètre de table ENABLE_SCHEMA_EVOLUTION est automatiquement défini sur TRUE.

  • Pour toutes les tables existantes, vous devez encore définir manuellement le paramètre de table ENABLE_SCHEMA_EVOLUTION sur TRUE.

schema.registry.url

Indiquez l’URL du service de registre de schéma. La valeur par défaut est vide.

Selon le format du fichier, schema.registry.url est obligatoire ou facultatif. La détection des schémas avec le connecteur Kafka est prise en charge dans l’un ou l’autre des scénarios ci-dessous :

  • Le registre des schémas est nécessaire pour Avro et Protobuf. La colonne est créée avec les types de données définis dans le registre de schémas fourni.

  • Le registre de schéma est facultatif pour JSON. En l’absence de registre des schéma, le type de données sera déduit sur la base des données fournies.

Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude. Pour plus d’informations, consultez Configuration du connecteur Kafka.

Convertisseurs

Les convertisseurs de données structurées, comme JSON, Avro et Protobuf, sont pris en charge. Notez que nous n’avons testé que les convertisseurs de données structurées suivants :

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

Les convertisseurs de données non structurées ne sont pas pris en charge avec la schématisation. Par exemple,

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

Les convertisseurs Snowflake ne sont pas pris en charge avec Snowpipe Streaming. Certains convertisseurs de données personnalisés n’ont pas été testés et peuvent ne pas être pris en charge.

Notes sur l’utilisation

  • La détection des schémas avec le connecteur Kafka est prise en charge avec ou sans registre de schéma fourni. Si vous utilisez le registre de schéma (Avro et Protobuf), la colonne sera créée avec les types de données définis dans le registre de schéma fourni. En l’absence de registre de schéma (JSON), le type de données sera déduit sur la base des données fournies.

  • L’évolution du schéma avec le connecteur Kafka prend en charge les modifications de colonnes de table suivantes :

    • Ajout de nouvelles colonnes

    • Suppression de la contrainte NOT NULL si la colonne des données source est manquante.

  • Si le connecteur Kafka crée la table cible, l’évolution du schéma est activée par défaut. Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur Kafka tentera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente configurées pour les lettres mortes (DLQ).

  • Le ARRAY JSON n’est pas pris en charge pour une schématisation plus poussée.

  • Pour le connecteur Kafka avec Snowpipe Streaming, l’évolution du schéma n’est pas suivie par la sortie SchemaEvolutionRecord dans les vues et commandes suivantes : vue INFORMATION_SCHEMA COLUMNS, vue ACCOUNT_USAGE COLUMNS, commande DESCRIBE TABLE et commande SHOW COLUMNS. La sortie SchemaEvolutionRecord affiche toujours NULL.

Exemples

Les exemples suivants démontrent les tables qui sont créées avant et après l’activation de la détection et de l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming.

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy