Détection et évolution de schémas du connecteur Kafka avec Snowpipe Streaming¶
Le connecteur Kafka avec Snowpipe Streaming prend en charge la détection et l’évolution de schémas. La structure des tables dans Snowflake peut être définie et peut évoluer automatiquement pour prendre en charge la structure des nouvelles données Snowpipe Streaming chargée par le connecteur Kafka.
Sans détection et évolution des schémas, la table Snowflake chargée par le connecteur Kafka ne comporte que deux colonnes VARIANT, RECORD_CONTENT et RECORD_METADATA. Lorsque la détection et l’évolution des schémas sont activées, Snowflake peut détecter le schéma des données en continu et charger les données dans des tables qui correspondent automatiquement à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.
Note
Cette fonction ne fonctionne qu’avec le connecteur Kafka avec Snowpipe Streaming. Actuellement, elle ne prend pas en charge le connecteur Kafka avec Snowpipe basé sur les fichiers.
Dans ce chapitre :
Conditions préalables¶
Avant d’activer cette fonction, veillez à mettre en place les conditions préalables suivantes.
Téléchargez la version du connecteur Kafka 2.0.0 ou ultérieure. Pour plus d’informations, consultez Installation et configuration du connecteur Kafka.
Utilisez la commande ALTER TABLE pour définir le paramètre
ENABLE_SCHEMA_EVOLUTION
sur TRUE sur la table. Vous devez également utiliser un rôle qui a le privilège OWNERSHIP sur la table. Pour plus d’informations, voir Évolution du schéma de table.
Configuration des propriétés requises pour Kafka¶
Configurez les propriétés requises suivantes dans le fichier de propriétés de votre connecteur Kafka :
snowflake.ingestion.method
Indiquez d’utiliser
SNOWPIPE_STREAMING
pour charger les données de votre sujet Kafka. Notez que cette fonction ne prend pas en chargeSNOWPIPE
.snowflake.enable.schematization
Spécifiez
TRUE
pour activer la détection et l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming. La valeur par défaut estFALSE
.Lorsque cette propriété est définie sur
TRUE
,Pour toute nouvelle table créée par le connecteur Kafka, le paramètre de table
ENABLE_SCHEMA_EVOLUTION
est automatiquement défini surTRUE
.Pour toutes les tables existantes, vous devez encore définir manuellement le paramètre de table
ENABLE_SCHEMA_EVOLUTION
surTRUE
.
schema.registry.url
Indiquez l’URL du service de registre de schéma. La valeur par défaut est vide.
Selon le format du fichier,
schema.registry.url
est obligatoire ou facultatif. La détection des schémas avec le connecteur Kafka est prise en charge dans l’un ou l’autre des scénarios ci-dessous :Le registre des schémas est nécessaire pour Avro et Protobuf. La colonne est créée avec les types de données définis dans le registre de schémas fourni.
Le registre de schéma est facultatif pour JSON. En l’absence de registre des schéma, le type de données sera déduit sur la base des données fournies.
Configurez les propriétés supplémentaires dans le fichier de propriétés de votre connecteur Kafka comme d’habitude. Pour plus d’informations, consultez Configuration du connecteur Kafka.
Convertisseurs¶
Les convertisseurs de données structurées, comme JSON, Avro et Protobuf, sont pris en charge. Notez que nous n’avons testé que les convertisseurs de données structurées suivants :
io.confluent.connect.avro.AvroConverter
io.confluent.connect.protobuf.ProtobufConverter
org.apache.kafka.connect.json.JsonConverter
io.confluent.connect.json.JsonSchemaConverter
Les convertisseurs de données non structurées ne sont pas pris en charge avec la schématisation. Par exemple,
org.apache.kafka.connect.converters.ByteArrayConverter
org.apache.kafka.connect.storage.StringConverter
Les convertisseurs Snowflake ne sont pas pris en charge avec Snowpipe Streaming. Certains convertisseurs de données personnalisés n’ont pas été testés et peuvent ne pas être pris en charge.
Notes sur l’utilisation¶
La détection des schémas avec le connecteur Kafka est prise en charge avec ou sans registre de schéma fourni. Si vous utilisez le registre de schéma (Avro et Protobuf), la colonne sera créée avec les types de données définis dans le registre de schéma fourni. En l’absence de registre de schéma (JSON), le type de données sera déduit sur la base des données fournies.
L’évolution du schéma avec le connecteur Kafka prend en charge les modifications de colonnes de table suivantes :
Ajout de nouvelles colonnes
Suppression de la contrainte NOT NULL si la colonne des données source est manquante.
Si le connecteur Kafka crée la table cible, l’évolution du schéma est activée par défaut. Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur Kafka tentera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente configurées pour les lettres mortes (DLQ).
Le ARRAY JSON n’est pas pris en charge pour une schématisation plus poussée.
Pour le connecteur Kafka avec Snowpipe Streaming, l’évolution du schéma n’est pas suivie par la sortie
SchemaEvolutionRecord
dans les vues et commandes suivantes : vue INFORMATION_SCHEMA COLUMNS, vue ACCOUNT_USAGE COLUMNS, commande DESCRIBE TABLE et commande SHOW COLUMNS. La sortieSchemaEvolutionRecord
affiche toujours NULL.
Exemples¶
Les exemples suivants démontrent les tables qui sont créées avant et après l’activation de la détection et de l’évolution de schémas pour le connecteur Kafka avec Snowpipe Streaming.
-- Before schema detection and evolution is enabled, the table only consists of two VARIANT columns, RECORD_CONTENT and RECORD_METADATA, as the following example demonstrates. +------+---------------------------------------------------------+---------------------------------------------------+ | Row | RECORD_METADATA | RECORD_CONTENT | |------+---------------------------------------------------------+---------------------------------------------------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZABZX", "side":...| | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| "account": "XYZ789", "symbol": "ZTEST", "side":...| | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZABZX", "side":...| | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| "account": "ABC123", "symbol": "ZTEST", "side":...| +------+---------------------------------------------------------+---------------------------------------------------| -- After schema detection and evolution is enabled, the table contains the columns that match the user-defined schema. The table can also automatically evolve to support the structure of new Snowpipe streaming data loaded by the Kafka connector. +------+---------------------------------------------------------+---------+--------+-------+----------+ | Row | RECORD_METADATA | ACCOUNT | SYMBOL | SIDE | QUANTITY | |------+---------------------------------------------------------+---------+--------+-------+----------| | 1 |{"CreateTime":1669074170090, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 3572 | | 2 |{"CreateTime":1669074170400, "headers": {"current.iter...| XYZ789 | ZABZX | SELL | 3024 | | 3 |{"CreateTime":1669074170659, "headers": {"current.iter...| XYZ789 | ZTEST | SELL | 799 | | 4 |{"CreateTime":1669074170904, "headers": {"current.iter...| ABC123 | ZABZX | BUY | 2033 | | 5 |{"CreateTime":1669074171063, "headers": {"current.iter...| ABC123 | ZTEST | BUY | 1558 | +------+---------------------------------------------------------+---------+--------+-------+----------|