Paramétrage du connecteur Openflow pour Kafka¶
Note
Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.
Conditions préalables¶
Assurez-vous d’avoir consulté Connecteur Openflow Snowflake pour Kafka.
Assurez-vous d’avoir Configuration d’Openflow - BYOC ou de Configurer Openflow - Déploiements Snowflake.
Si vous utilisez Openflow - Déploiements Snowflake, assurez-vous d’avoir examiné la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kafka. Le connecteur doit pouvoir se connecter à tous les brokers Kafka du cluster.
Paramétrage du compte Snowflake¶
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.
Créez un nouveau rôle ou utilisez un rôle existant et accordez les:ref:
privilèges de base de données <label-database_privileges>.Le connecteur nécessite un utilisateur pour créer la table de destination. Assurez-vous que l’utilisateur dispose des privilèges nécessaires pour gérer les objets Snowflake :
Objet
Privilège
Remarques
Base de données
USAGE
Schéma
USAGE
Table
OWNERSHIP
Nécessaire pour que le connecteur ingère des données dans une table.
Snowflake recommande de créer un utilisateur et un rôle distincts pour chaque cluster Kafka pour un meilleur contrôle d’accès.
Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :
Note
Les privilèges doivent être accordés directement au rôle de connecteur et ne peuvent pas être hérités.
Configurer la table de destination
Snowflake recommande fortement d’utiliser l’évolution de schéma côté serveur pour les modifications de schéma et une :doc:` table d’erreurs pour la journalisation des erreurs DML</user-guide/data-load-overview>`. L’exemple suivant montre comment créer une table et ajouter les autorisations OWNERSHIP adaptées.
Ce connecteur prend en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur. Elle mappe automatiquement les clés de premier niveau du contenu de l’enregistrement aux colonnes de la table correspondant par leur nom (non sensible à la casse).
Lorsque l’évolution de schéma est activée, Snowflake peut automatiquement développer la table de destination en ajoutant de nouvelles colonnes détectées dans le flux entrant et en supprimant des contraintes NOT NULL pour prendre en charge de nouveaux modèles de données. Pour plus d’informations, consultez Évolution du schéma de la table.
Si ENABLE_SCHEMA_EVOLUTION n’est pas activé, vous devez créer le schéma manuellement en étendant la définition de table. Le connecteur tente de faire correspondre les clés de premier niveau du contenu de l’enregistrement aux colonnes de la table par leur nom. Si les clés du fichier JSON ne correspondent pas aux colonnes de la table, le connecteur ignore les clés.
(Facultatif) Configurer un gestionnaire de secrets
Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.
Déterminez comment vous vous authentifierez auprès du gestionnaire de secrets une fois celui-ci configuré. SurAWS, Snowflake recommande d’utiliser le rôle d’instance EC2 associé à Openflow, de sorte qu’aucun autre secret ne doit être conservé.
Configurez un fournisseur de paramètres associé à ce gestionnaire de secrets dans Openflow, à partir du menu hamburger en haut à droite. Accédez à Paramètres du contrôleur > Fournisseur de paramètres et récupérez les valeurs de vos paramètres.
Référencez tous les identifiants de connexion avec les chemins de paramètres associés afin qu’aucune valeur sensible ne doive être conservée dans Openflow.
Octroyer l’accès aux utilisateurs
Pour tout autre utilisateur de Snowflake qui a besoin d’accéder aux données ingérées brutes par le connecteur (par exemple, pour un traitement personnalisé dans Snowflake), accordez à ces utilisateurs le rôle créé à l’étape 1.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Pour installer le connecteur, procédez comme suit :
Accédez à la page d’aperçu d’Openflow. Dans la section Connecteurs à la une, sélectionnez Voir plus de connecteurs.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Ajouter à l’environnement d’exécution.
Dans la boîte de dialogue Sélectionner un environnement d’exécution, sélectionnez votre environnement d’exécution dans la liste déroulante Environnements d’exécution disponibles et sélectionnez Ajouter.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données, un schéma et une table dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Autoriser lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Si nécessaire, personnalisez la configuration du connecteur avant de configurer les paramètres intégrés.
Renseigner les paramètres du groupe de processus
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Paramètres.
Remplir les valeurs de paramètres requises
Paramètres¶
Le tableau suivant décrit les paramètres du connecteur Openflow pour Kafka :
Paramètre |
Description |
Obligatoire |
|---|---|---|
Réinitialisation du décalage automatique de Kafka |
Configuration automatique du décalage appliquée lorsqu’aucun décalage de consommateur antérieur n’est trouvé correspondant à la propriété Kafka Valeurs possibles : le plus tôt : réinitialiser automatiquement le décalage au décalage précédent, le plus récent : réinitialiser automatiquement le décalage au dernier décalage, aucun : lancer une exception vers le consommateur si aucun décalage antérieur n’est trouvé pour le groupe de consommateurs. Par défaut : dernier |
Oui |
Serveurs Bootstrap Kafka |
Une liste de serveurs Bootstrap Kafka, séparés par des virgules, devant contenir un port, par exemple |
Oui |
ID de groupe de consommateurs Kafka |
L’ID d’un groupe de consommateurs utilisé par le connecteur. Il peut être arbitraire mais doit être unique. |
Oui |
Mot de passe Kafka SASL |
Mot de passe fourni avec le mot de passe configuré lors de l’utilisation du mécanisme SASL512 SCRAM |
|
Nom d’utilisateur Kafka SASL |
Nom d’utilisateur fourni avec un mot de passe configuré lors de l’utilisation deSASL512SCRAM |
|
Format des sujets Kafka |
Un des éléments suivants : noms / modèle. Indique si les sujets Kafka fournis sont une liste de noms séparés par des virgules ou une expression régulière unique. |
Oui |
Sujets Kafka |
Une liste de sujets Kafka séparés par des virgules ou une expression régulière. |
Oui |
Base de données de destination Snowflake |
La base de données dans laquelle les données sont conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Schéma de destination Snowflake |
Le schéma dans lequel les données sont conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. Voir l’exemple suivant :
|
Oui |
Table de destination Snowflake |
La table dans laquelle les données sont conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Démarrer le connecteur¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Start. Le connecteur commence l’ingestion des données.
Comprendre la colonne KAFKAMETADATA¶
Le connecteur remplit la structure KAFKAMETADATA avec des métadonnées sur l’enregistrement Kafka. La structure contient les informations suivantes :
Champ |
Type de données |
Description |
|---|---|---|
topic |
Chaîne |
Le nom du sujet Kafka d’où provient l’enregistrement. |
partition |
number |
Le numéro de la partition dans le sujet. (Notez qu’il s’agit de la partition Kafka, pas de la micro-partition Snowflake.) |
offset |
number |
Le décalage dans cette partition. |
timestamp |
number |
Horodatage correspondant au moment où l’enregistrement a été ajouté à Kafka. |
key |
Chaîne |
Si le message est un KeyedMessage Kafka, il s’agit de la clé de ce message. Pour que le connecteur enregistre la clé dans RECORD_METADATA, le paramètre |
headers |
Objet |
Un en-tête est une paire clé-valeur définie par l’utilisateur associée à l’enregistrement. Chaque enregistrement peut avoir 0, 1 ou plusieurs en-têtes. |
Mesure de la latence d’ingestion¶
Pour le suivi des modifications, le traitement incrémentiel et les requêtes Time Travel basées sur l’heure de modification des lignes, la fonctionnalité ROW_TIMESTAMP peut être utilisée.
Activez-la en exécutant la commande suivante sur votre table de destination :
Une fois les horodatages de lignes activés, les tables exposent la colonne METADATA$ROW_LAST_COMMIT_TIME, qui renvoie l’horodatage de la dernière modification de chaque ligne.
Pour plus d’informations, voir METADATA$ROW_LAST_COMMIT_TIME.
Note
L’horodatage de la ligne n’est pas disponible pour les tables interactives. Pour plus d’informations, voir Tables interactives et entrepôts interactifs Snowflake.
Utilisation du connecteur avec des tables Apache Iceberg™¶
Le connecteur peut ingérer des données dans des tables Apache Iceberg™ gérées par Snowflake, mais doit répondre aux exigences suivantes :
Vous devez avoir obtenu le privilège USAGE sur le volume externe associé à votre table Apache Iceberg™.
Vous devez créer une table Apache Iceberg™ avant d’exécuter le connecteur.
Accorder l’utilisation sur un volume externe¶
Par exemple, si votre table Iceberg utilise le volume externe kafka_external_volume et que le connecteur utilise le rôle openflow_kafka_connector_role, exécutez l’instruction suivante :
Créer une table Apache Iceberg™ pour l’ingestion¶
Le connecteur ne crée pas automatiquement des tables Iceberg et ne prend pas en charge l’évolution des schémas. Avant d’exécuter le connecteur, vous devez créer manuellement une table Iceberg.
Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg (notamment VARIANT) ou des types Snowflake compatibles </user-guide/tables-iceberg-data-types>.
Prenons l’exemple du message suivant :
Pour créer une table Iceberg pour le message d’exemple, utilisez les instructions suivantes :
Utilisation du connecteur avec des tables interactives¶
Les tables interactives sont un type spécial de table Snowflake optimisée pour les requêtes à faible latence et à forte concurrence. Pour plus d’informations, voir Tables interactives et entrepôts interactifs Snowflake.
Créez une table interactive :
Considérations importantes :
Les tables interactives ont des limitations et des restrictions de requêtes spécifiques. Consultez Tables interactives et entrepôts interactifs Snowflake avant de les utiliser avec le connecteur.
Pour les tables interactives, toutes les transformations requises doivent être gérées dans la définition de la table.
Des entrepôts interactifs sont nécessaires pour interroger efficacement les tables interactives.
Utilisation du connecteur avec un schéma défini par le client pour la table de destination¶
Le connecteur traite chaque enregistrement Kafka comme une ligne à insérer dans une table Snowflake. Par exemple, si vous avez un sujet Kafka avec le contenu du message structuré comme le fichier JSON :
Par défaut, vous n’êtes pas obligé de spécifier tous les champs de JSON grâce à la fonctionnalité ENABLE_SCHEMA_EVOLUTION = TRUE. Toutefois, si vous préférez un schéma statique, il peut être créé en exécutant :
Utilisation du connecteur avec un PIPE défini par le client¶
Si vous choisissez de créer votre propre canal, vous pouvez définir la logique de transformation des données dans l’instruction COPY INTO du canal. Vous pouvez renommer les colonnes si nécessaire et convertir les types de données selon vos besoins. Par exemple :
Lorsque vous définissez votre propre canal, les colonnes de votre table de destination ne doivent pas nécessairement correspondre aux clés JSON. Vous pouvez renommer les colonnes à l’aide des noms de votre choix et convertir les types de données selon vos besoins.
Pour ajuster le connecteur afin qu’il fonctionne avec un canal personnalisé, effectuez les tâches suivantes :
Faites un clic droit sur le processeur PublishSnowpipeStreaming utilisé dans votre flux d’ingestion Kafka dans le canevas Openflow.
Sélectionnez Configurer dans le menu contextuel.
Accédez à l’onglet Propriétés.
Dans le champ Type de destination, sélectionnez Canal.
Dans le champ Canal, tapez le nom de votre PIPE.
Sélectionnez Appliquer pour enregistrer la configuration.
Personnalisation de la gestion des erreurs¶
Le traitement des erreurs est réparti entre les échecs côté Openflow et les échecs côté serveur au sein du service Snowpipe Streaming.
Erreurs Openflow (échecs côté client) : Des erreurs telles que des charges utiles impossibles à analyser ou des échecs de transformations personnalisées se produisent avant que les enregistrements n’atteignent Snowflake. Par défaut, ces enregistrements sont ignorés. Il est possible de traiter ces erreurs dans Openflow - utilisez FlowFiles de la relation d’échec d’analyse dans le processeur ConsumeKafka.
Erreurs Snowpipe Streaming (échecs côté serveur) : Les erreurs des enregistrements qui atteignent Snowflake, mais qui sont incompatibles avec le schéma de la table de destination (par exemple, les discordances de type), sont capturées par l’infrastructure Snowflake. Lorsque la journalisation des erreurs est activée sur la table de destination (
error_logging = true), ces lignes qui ont échoué sont automatiquement ingérées dans la table d’erreur de destination.
Réglage des performances¶
Optimisation des performances du connecteur Openflow pour Kafka