Détection et évolution de schémas du connecteur Kafka avec Snowpipe Streaming¶
Le connecteur Kafka avec Snowpipe Streaming prend en charge la détection et l’évolution de schémas. La structure des tables dans Snowflake peut être définie et peut évoluer automatiquement pour prendre en charge la structure des nouvelles données Snowpipe Streaming chargée par le connecteur Kafka.
Sans détection et évolution des schémas, la table Snowflake chargée par le connecteur Kafka ne comporte que deux colonnes VARIANT, RECORD_CONTENT et RECORD_METADATA. Lorsque la détection et l’évolution des schémas sont activées, Snowflake peut détecter le schéma des données en continu et charger les données dans des tables qui correspondent automatiquement à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.
Note
Cette fonction en avant-première ne fonctionne qu’avec le connecteur Kafka avec Snowpipe Streaming. Actuellement, elle ne supporte pas le connecteur Kafka avec Snowpipe basé sur les fichiers.
Dans ce chapitre :
Conditions préalables¶
Avant d’activer cette fonction, veillez à mettre en place les conditions préalables suivantes.
Téléchargez la version du connecteur Kafka 2.0.0 ou ultérieure. Pour plus d’informations, consultez Installation et configuration du connecteur Kafka.
Utilisez la commande ALTER TABLE pour définir le paramètre
ENABLE_SCHEMA_EVOLUTION
sur TRUE sur la table. Vous devez également utiliser un rôle qui a le privilège EVOLVE SCHEMA sur la table. Pour plus d’informations, consultez Évolution du schéma de table.
Configuration des propriétés requises pour Kafka¶
Configurez les propriétés requises suivantes dans le fichier de propriétés de votre connecteur Kafka :
snowflake.ingestion.method
Indiquez d’utiliser
SNOWPIPE_STREAMING
pour charger les données de votre sujet Kafka. Notez que cette fonction en avant-première ne prend actuellement pas en chargeSNOWPIPE
.snowflake.enable.schematization
Spécifiez
TRUE
pour activer la détection et l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming. La valeur par défaut estFALSE
.schema.registry.url
Indiquez l’URL du service de registre de schéma. La valeur par défaut est vide.
Selon le format du fichier,
schema.registry.url
est obligatoire ou facultatif. La détection des schémas avec le connecteur Kafka est prise en charge dans l’un ou l’autre des scénarios ci-dessous :Le registre des schémas est nécessaire pour Avro et Protobuf. La colonne est créée avec les types de données définis dans le registre de schémas fourni.
Le registre de schéma est facultatif pour JSON. En l’absence de registre des schéma, le type de données sera déduit sur la base des données fournies.
Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude. Pour plus d’informations, consultez Configuration du connecteur Kafka.
Convertisseurs¶
Tous les convertisseurs de données structurées, comme JSON, Avro et Protobuf, sont pris en charge. Notez que nous n’avons testé que les convertisseurs de données structurées suivants :
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
Les convertisseurs de données non structurées ne sont pas pris en charge. Par exemple,
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.storage.StringConverter
Certains convertisseurs de données personnalisés peuvent ne pas être pris en charge. Contactez le support Snowflake si vous avez besoin d’aide.
Notes sur l’utilisation¶
La détection des schémas avec le connecteur Kafka est prise en charge avec ou sans registre de schéma fourni. Si vous utilisez le registre de schéma (Avro et Protobuf), la colonne sera créée avec les types de données définis dans le registre de schéma fourni. En l’absence de registre de schéma (JSON), le type de données sera déduit sur la base des données fournies.
L’évolution du schéma avec le connecteur Kafka prend en charge les modifications de colonnes de table suivantes :
Ajout de nouvelles colonnes
Suppression de la contrainte NOT NULL si la colonne des données source est manquante.
Si le connecteur Kafka crée la table cible, l’évolution du schéma est activée par défaut. Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur Kafka tentera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente configurées pour les lettres mortes (DLQ).
Exemples¶
Les exemples suivants démontrent les tables qui sont créées avant et après l’activation de la détection et de l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming.
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|