Paramétrage du connecteur Openflow pour Kafka

Note

Le connecteur est soumis aux conditions d’utilisation du connecteur.

Conditions préalables

  1. Assurez-vous d’avoir consulté Connecteur Openflow pour Kafka.

  2. Assurez-vous que vous avez paramétré Openflow.

Types de connecteurs

Le connecteur Openflow pour Kafka est disponible dans trois configurations différentes, chacune étant optimisée pour des cas d’utilisation spécifiques. Vous pouvez télécharger ces définitions de connecteurs à partir de la galerie des connecteurs :

Apache Kafka pour le format de données JSON

Connecteur simplifié pour l’ingestion de messages JSON avec évolution des schémas et mappage des sujets vers les tables

Apache Kafka pour le format de données AVRO

Connecteur simplifié pour l’ingestion de messages AVRO avec évolution des schémas et mappage des sujets vers les tables

Apache Kafka avec DLQ et métadonnées

Connecteur complet avec prise en charge de la file d’attente des messages non distribués (DLQ), la gestion des métadonnées et fonctionnalités identiques à celle du Snowflake Connector pour Kafka

Pour la configuration détaillée de types de connecteurs spécifiques, voir :

Quel connecteur choisir ?

Choisissez la variante de connecteur qui correspond le mieux à votre format de données, à vos exigences opérationnelles et à vos besoins en matière de fonctionnalité :

Choisissez Apache Kafka pour le format de données JSON ou AVRO quand :

  • Vos messages Kafka sont au format JSON ou AVRO

  • Vous avez besoin de capacités de base pour l’évolution des schémas

  • Vous voulez une configuration simple avec un minimum de configuration

  • Vous n’avez pas besoin d’une fonctionnalité avancée de gestion des erreurs ou de mise en file d’attente des messages non distribués

  • Vous configurez une nouvelle intégration et souhaitez commencer rapidement

Considérations spécifiques au format :

  • Format JSON : Plus flexible pour les structures de données diverses, plus facile à déboguer et à inspecter

  • Format AVRO : Données fortement typées avec intégration native du registre de schémas, mieux adaptées aux pipelines de données structurées

Choisissez Apache Kafka avec DLQ et métadonnées quand :

  • Vous effectuez la migration depuis le Snowflake Connector pour Kafka et vous avez besoin d’une parité des fonctionnalités avec des fonctionnalités compatibles

  • Vous avez besoin d’une gestion robuste des erreurs et d’une prise en charge de la file d’attente des messages non distribués pour les messages ayant échoué

  • Vous avez besoin de métadonnées détaillées sur l’ingestion des messages (horodatages, décalages, en-têtes)

Considérations relatives à la migration

Si vous utilisez actuellement Snowflake Connector pour Kafka, choisissez le connecteur Apache Kafka avec DLQ et métadonnées pour une expérience de migration transparente avec une compatibilité des fonctionnalités.

Différences de traitement des noms de champs : Openflow Connector pour Kafka traite les caractères spéciaux dans les noms de champs différemment de Snowflake Connector pour Kafka. Après la migration, Openflow Connector pour Kafka peut créer de nouvelles colonnes Snowflake avec des noms différents en raison de ces différences de convention de dénomination. Pour des informations détaillées sur la façon dont les noms de champs sont transformés, voir Mappage des noms de champs et traitement des caractères spéciaux.

Considérations en matière de performances

  • Les connecteurs au format JSON et AVRO offrent de meilleures performances pour les cas d’utilisation simples en raison de leur conception rationalisée.

  • Le connecteur DLQ avec métadonnées offre une surveillance et une gestion des erreurs plus complètes au prix d’une utilisation légèrement supérieure des ressources

Paramétrage du compte Snowflake

En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

  1. Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.

  2. Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.

    Étant donné que le connecteur a la capacité de créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges nécessaires pour créer et gérer des objets Snowflake :

    Objet

    Privilège

    Remarques

    Base de données

    USAGE

    Schéma

    USAGE . CREATE TABLE .

    Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE object peuvent être révoqués.

    Table

    OWNERSHIP

    Requis uniquement lors de l’utilisation du connecteur Kafka pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements provenant du sujet Kafka, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.

    Snowflake recommande de créer un utilisateur et un rôle distincts pour chaque instance Kafka pour un meilleur contrôle d’accès.

    Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :

    USE ROLE securityadmin;
    
    CREATE ROLE kafka_connector_role_1;
    GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1;
    
    -- Only for existing tables
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
    
    Copy

    Notez que les privilèges doivent être accordés directement au rôle de connecteur et ne peuvent pas être hérités.

  3. Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.

    Le rôle doit être attribué en tant que rôle par défaut pour l’utilisateur :

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1;
    ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
    
    Copy
  4. Configurez l’authentification par paire de clés pour l’utilisateur SERVICE Snowflake de l’étape 1.

  5. Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.

    Note

    Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.

    1. Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associée à Openflow, car de cette manière, aucun autre secret ne doit être conservé.

    2. Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez vers Controller Settings » Parameter Provider et récupérez les valeurs de vos paramètres.

    3. À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.

  6. Si d’autres utilisateurs de Snowflake ont besoin d’accéder aux documents bruts ingérés et aux tables ingérées par le connecteur (par exemple, pour un traitement personnalisé dans Snowflake), accordez à ces utilisateurs le rôle créé à l’étape 1.

Définir le connecteur

En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :

Installer le connecteur

  1. Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.

  4. Sélectionnez Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  5. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  6. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

  1. Renseigner les paramètres du groupe de processus

    1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Paramètres.

    2. Complétez les valeurs des paramètres requis comme décrit dans Paramètres communs.

Paramètres communs

Toutes les variantes de connecteurs Kafka partagent des contextes de paramètres communs pour la connectivité et l’authentification de base.

Paramètres de la destination Snowflake

Paramètre

Description

Obligatoire

Base de données de destination

La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake

Oui

Schéma de destination

Le schéma dans lequel les données seront conservées. Elle doit déjà exister dans Snowflake. Ce paramètre est sensible à la casse. Indiquez ce paramètre en majuscules, sauf si le schéma a été créé avec un nom entre guillemets doubles, puis assurez-vous que la casse correspond, mais n’incluez pas les guillemets doubles. Voir l’exemple suivant :

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name : utilisez SCHEMA_NAME

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" : utilisez schema_name ou SCHEMA_NAME respectivement

Oui

Identificateur de compte Snowflake

Nom du compte Snowflake formaté comme suit : [[nom de l’organisation] -[[nom du compte] où les données seront conservées

Oui

Stratégie d’authentification Snowflake

Stratégie d’authentification auprès de Snowflake. Valeurs possibles : SNOWFLAKE_SESSION_TOKEN - lorsque nous exécutons le flux sur SPCS, KEY_PAIR lorsque nous voulons configurer l’accès à l’aide d’une clé privée

Oui

Clé privée de Snowflake

La clé privée RSA utilisée pour l’authentification. La clé RSA doit être formatée selon les normes PKCS8 et comporter les en-têtes et pieds de page standard PEM. Notez que vous devez définir soit le fichier de clé privée de Snowflake, soit la clé privée de Snowflake

Non

Fichier de clé privée de Snowflake

Le fichier qui contient la clé privée RSA utilisée pour l’authentification à Snowflake, formaté selon les normes PKCS8 et comportant les en-têtes et pieds de page standard PEM. La ligne d’en-tête commence par -----BEGIN PRIVATE. Cochez la case Reference asset pour télécharger le fichier de la clé privée.

Non

Mot de passe de la clé privée de Snowflake

Le mot de passe associé au fichier de la clé privée de Snowflake

Non

Rôle Snowflake

Rôle Snowflake utilisé lors de l’exécution de la requête

Oui

Nom d’utilisateur Snowflake

Nom d’utilisateur utilisé pour se connecter à l’instance de Snowflake

Oui

Paramètres de la source Kafka (authentificationSASL)

Paramètre

Description

Obligatoire

Protocole de sécurité de Kafka

Protocole de sécurité utilisé pour communiquer avec les courtiers. Correspond à la propriété security.protocol du client Kafka. Une des options : SASL_PLAINTEXT / SASL_SSL

Oui

Mécanisme Kafka SASL

Mécanisme SASL utilisé pour l’authentification. Correspond à la propriété sasl.mechanism du client Kafka. L’un d’entre eux : PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512

Oui

Nom d’utilisateur Kafka SASL

Le nom d’utilisateur pour s’authentifier auprès de Kafka

Oui

Mot de passe Kafka SASL

Le mot de passe pour s’authentifier auprès de Kafka

Oui

Serveurs Bootstrap Kafka

Une liste séparée par des virgules du courtier Kafka pour récupérer les données, doit contenir le port, par exemple kafka-broker:9092. La même instance est utilisée pour le sujet DLQ.

Oui

Paramètres d’ingestion Kafka

Paramètre

Description

Obligatoire

Format des sujets Kafka

Un des éléments suivants : noms / modèle. Indique si les sujets Kafka fournis sont une liste de noms séparés par des virgules ou une expression régulière unique.

Oui

Sujets Kafka

Une liste de sujets Kafka séparés par des virgules ou une expression régulière.

Oui

Id du groupe Kafka

L’ID d’un groupe de consommateurs utilisé par le connecteur. Il peut être arbitraire mais doit être unique.

Oui

Réinitialisation du décalage automatique de Kafka

Configuration automatique du décalage appliquée lorsqu’aucun décalage de consommateur antérieur n’est trouvé correspondant à la propriété Kafka auto.offset.reset. L’un des éléments suivants : le plus ancien / le plus récent. Valeur par défaut : latest

Oui

Section To Table Map

Ce paramètre facultatif permet à l’utilisateur de spécifier quels sujets doivent être mappés à quelles tables. Chaque sujet et son nom de table doivent être séparés par le signe deux-points (voir exemple ci-dessous). Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Les expressions régulières ne peuvent pas être ambiguës — toute rubrique correspondante doit correspondre à une seule table cible. S’il est vide ou si aucune correspondance n’est trouvée, le nom de la rubrique sera utilisé comme nom de table. Remarque : le mappage ne peut pas contenir d’espaces après les virgules.

Non

Topic To Table Map exemples de valeurs :

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*: destination_table - mappe tous les sujets vers destination_table

Configurer les paramètres spécifiques à la variante

Après avoir configuré les paramètres communs, vous devez configurer les paramètres spécifiques à la variante de connecteur que vous avez choisie :

Pour les connecteurs Apache Kafka pour le format de données JSON et Apache Kafka pour le format de données AVRO :

Voir Apache Kafka pour le format de données JSON/AVRO pour les paramètres spécifiques à JSON/AVRO.

Pour Apache Kafka avec DLQ et le connecteur de métadonnées:

Voir Apache Kafka avec DLQ et métadonnées pour les paramètres avancés, notamment la configuration de DLQ, les paramètres de schématisation, la prise en charge de la table Iceberg et les options de format de message.

Authentification

Toutes les variantes de connecteur prennent en charge l’authentification SASL configurée par le biais de contextes de paramètres, comme décrit dans Paramètres de la source Kafka (authentificationSASL).

Pour les autres méthodes d’authentification, y compris mTLS et AWS MSK IAM, voir Configurer d’autres méthodes d’authentification pour le connecteur Openflow pour Kafka.

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur l’avion et cliquez sur Activer tous les Controller Service.

  2. Faites un clic droit sur l’avion et cliquez sur Démarrer. Le connecteur commence l’ingestion des données.