Détection et évolution de schémas du connecteur Kafka avec Snowpipe Streaming

Le connecteur Kafka avec Snowpipe Streaming prend en charge la détection et l’évolution de schémas. La structure des tables dans Snowflake peut être définie et peut évoluer automatiquement pour prendre en charge la structure des nouvelles données Snowpipe Streaming chargée par le connecteur Kafka.

Sans détection et évolution des schémas, la table Snowflake chargée par le connecteur Kafka ne comporte que deux colonnes VARIANT, RECORD_CONTENT et RECORD_METADATA. Lorsque la détection et l’évolution des schémas sont activées, Snowflake peut détecter le schéma des données en continu et charger les données dans des tables qui correspondent automatiquement à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.

Note

Cette fonction en avant-première ne fonctionne qu’avec le connecteur Kafka avec Snowpipe Streaming. Actuellement, elle ne supporte pas le connecteur Kafka avec Snowpipe basé sur les fichiers.

Dans ce chapitre :

Conditions préalables

Avant d’activer cette fonction, veillez à mettre en place les conditions préalables suivantes.

Configuration des propriétés requises pour Kafka

Configurez les propriétés requises suivantes dans le fichier de propriétés de votre connecteur Kafka :

snowflake.ingestion.method

Indiquez d’utiliser SNOWPIPE_STREAMING pour charger les données de votre sujet Kafka. Notez que cette fonction en avant-première ne prend actuellement pas en charge SNOWPIPE.

snowflake.enable.schematization

Spécifiez TRUE pour activer la détection et l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming. La valeur par défaut est FALSE.

schema.registry.url

Indiquez l’URL du service de registre de schéma. La valeur par défaut est vide.

Selon le format du fichier, schema.registry.url est obligatoire ou facultatif. La détection des schémas avec le connecteur Kafka est prise en charge dans l’un ou l’autre des scénarios ci-dessous :

  • Le registre des schémas est nécessaire pour Avro et Protobuf. La colonne est créée avec les types de données définis dans le registre de schémas fourni.

  • Le registre de schéma est facultatif pour JSON. En l’absence de registre des schéma, le type de données sera déduit sur la base des données fournies.

Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude. Pour plus d’informations, consultez Configuration du connecteur Kafka.

Convertisseurs

Tous les convertisseurs de données structurées, comme JSON, Avro et Protobuf, sont pris en charge. Notez que nous n’avons testé que les convertisseurs de données structurées suivants :

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.protobuf.ProtobufConverter

  • org.apache.kafka.connect.json.JsonConverter

  • io.confluent.connect.json.JsonSchemaConverter

Les convertisseurs de données non structurées ne sont pas pris en charge. Par exemple,

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • org.apache.kafka.connect.storage.StringConverter

Certains convertisseurs de données personnalisés peuvent ne pas être pris en charge. Contactez le support Snowflake si vous avez besoin d’aide.

Notes sur l’utilisation

  • La détection des schémas avec le connecteur Kafka est prise en charge avec ou sans registre de schéma fourni. Si vous utilisez le registre de schéma (Avro et Protobuf), la colonne sera créée avec les types de données définis dans le registre de schéma fourni. En l’absence de registre de schéma (JSON), le type de données sera déduit sur la base des données fournies.

  • L’évolution du schéma avec le connecteur Kafka prend en charge les modifications de colonnes de table suivantes :

    • Ajout de nouvelles colonnes

    • Suppression de la contrainte NOT NULL si la colonne des données source est manquante.

  • Si le connecteur Kafka crée la table cible, l’évolution du schéma est activée par défaut. Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur Kafka tentera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente configurées pour les lettres mortes (DLQ).

Exemples

Les exemples suivants démontrent les tables qui sont créées avant et après l’activation de la détection et de l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming.

-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates.
+------+---------------------------------------------------------+---------------------------------------------------+
| Row  | RECORD_METADATA                                         | RECORD_CONTENT                                    |
|------+---------------------------------------------------------+---------------------------------------------------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...|
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...|
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...|
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...|
+------+---------------------------------------------------------+---------------------------------------------------|

-- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector.
+------+---------------------------------------------------------+---------+--------+-------+----------+
| Row  | RECORD_METADATA                                         | ACCOUNT | SYMBOL | SIDE  | QUANTITY |
|------+---------------------------------------------------------+---------+--------+-------+----------|
| 1    |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 3572     |
| 2    |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789  | ZABZX  | SELL  | 3024     |
| 3    |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789  | ZTEST  | SELL  | 799      |
| 4    |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123  | ZABZX  | BUY   | 2033     |
| 5    |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123  | ZTEST  | BUY   | 1558     |
+------+---------------------------------------------------------+---------+--------+-------+----------|
Copy