Paramétrage du connecteur Openflow pour Kafka

Note

Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.

Conditions préalables

  1. Assurez-vous d’avoir consulté Connecteur Openflow Snowflake pour Kafka.

  2. Assurez-vous d’avoir Configuration d’Openflow - BYOC ou de Configurer Openflow - Déploiements Snowflake.

  3. Si vous utilisez Openflow - Déploiements Snowflake, assurez-vous d’avoir examiné la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kafka. Le connecteur doit pouvoir se connecter à tous les brokers Kafka du cluster.

Paramétrage du compte Snowflake

En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

  1. Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.

  2. Créez un nouveau rôle ou utilisez un rôle existant et accordez les:ref:privilèges de base de données <label-database_privileges>.

    Le connecteur nécessite un utilisateur pour créer la table de destination. Assurez-vous que l’utilisateur dispose des privilèges nécessaires pour gérer les objets Snowflake :

    Objet

    Privilège

    Remarques

    Base de données

    USAGE

    Schéma

    USAGE

    Table

    OWNERSHIP

    Nécessaire pour que le connecteur ingère des données dans une table.

    Snowflake recommande de créer un utilisateur et un rôle distincts pour chaque cluster Kafka pour un meilleur contrôle d’accès.

    Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :

    USE ROLE securityadmin;
    CREATE ROLE openflow_kafka_connector_role_1;
    
    GRANT USAGE ON DATABASE kafka_db TO ROLE openflow_kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE openflow_kafka_connector_role_1;
    

    Note

    Les privilèges doivent être accordés directement au rôle de connecteur et ne peuvent pas être hérités.

  3. Configurer la table de destination

    Snowflake recommande fortement d’utiliser l’évolution de schéma côté serveur pour les modifications de schéma et une :doc:` table d’erreurs pour la journalisation des erreurs DML</user-guide/data-load-overview>`. L’exemple suivant montre comment créer une table et ajouter les autorisations OWNERSHIP adaptées.

    USE ROLE openflow_kafka_connector_role_1;
    
    CREATE TABLE kafka_db.kafka_schema.<DESTINATION_TABLE_NAME> (
      kafkaMetadata variant
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE openflow_kafka_connector_role_1;
    

    Ce connecteur prend en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur. Elle mappe automatiquement les clés de premier niveau du contenu de l’enregistrement aux colonnes de la table correspondant par leur nom (non sensible à la casse).

    Lorsque l’évolution de schéma est activée, Snowflake peut automatiquement développer la table de destination en ajoutant de nouvelles colonnes détectées dans le flux entrant et en supprimant des contraintes NOT NULL pour prendre en charge de nouveaux modèles de données. Pour plus d’informations, consultez Évolution du schéma de la table.

    Si ENABLE_SCHEMA_EVOLUTION n’est pas activé, vous devez créer le schéma manuellement en étendant la définition de table. Le connecteur tente de faire correspondre les clés de premier niveau du contenu de l’enregistrement aux colonnes de la table par leur nom. Si les clés du fichier JSON ne correspondent pas aux colonnes de la table, le connecteur ignore les clés.

  4. (Facultatif) Configurer un gestionnaire de secrets

    Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.

    1. Déterminez comment vous vous authentifierez auprès du gestionnaire de secrets une fois celui-ci configuré. SurAWS, Snowflake recommande d’utiliser le rôle d’instance EC2 associé à Openflow, de sorte qu’aucun autre secret ne doit être conservé.

    2. Configurez un fournisseur de paramètres associé à ce gestionnaire de secrets dans Openflow, à partir du menu hamburger en haut à droite. Accédez à Paramètres du contrôleur > Fournisseur de paramètres et récupérez les valeurs de vos paramètres.

    3. Référencez tous les identifiants de connexion avec les chemins de paramètres associés afin qu’aucune valeur sensible ne doive être conservée dans Openflow.

  5. Octroyer l’accès aux utilisateurs

    Pour tout autre utilisateur de Snowflake qui a besoin d’accéder aux données ingérées brutes par le connecteur (par exemple, pour un traitement personnalisé dans Snowflake), accordez à ces utilisateurs le rôle créé à l’étape 1.

Définir le connecteur

En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :

Installer le connecteur

Pour installer le connecteur, procédez comme suit :

  1. Accédez à la page d’aperçu d’Openflow. Dans la section Connecteurs à la une, sélectionnez Voir plus de connecteurs.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Ajouter à l’environnement d’exécution.

  3. Dans la boîte de dialogue Sélectionner un environnement d’exécution, sélectionnez votre environnement d’exécution dans la liste déroulante Environnements d’exécution disponibles et sélectionnez Ajouter.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données, un schéma et une table dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  4. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Autoriser lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  5. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

    Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

  1. Si nécessaire, personnalisez la configuration du connecteur avant de configurer les paramètres intégrés.

  2. Renseigner les paramètres du groupe de processus

    1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Paramètres.

    2. Remplir les valeurs de paramètres requises

Paramètres

Le tableau suivant décrit les paramètres du connecteur Openflow pour Kafka :

Paramètre

Description

Obligatoire

Réinitialisation du décalage automatique de Kafka

Configuration automatique du décalage appliquée lorsqu’aucun décalage de consommateur antérieur n’est trouvé correspondant à la propriété Kafka auto.offset.reset.

Valeurs possibles : le plus tôt : réinitialiser automatiquement le décalage au décalage précédent, le plus récent : réinitialiser automatiquement le décalage au dernier décalage, aucun : lancer une exception vers le consommateur si aucun décalage antérieur n’est trouvé pour le groupe de consommateurs.

Par défaut : dernier

Oui

Serveurs Bootstrap Kafka

Une liste de serveurs Bootstrap Kafka, séparés par des virgules, devant contenir un port, par exemple kafka-broker:9092.

Oui

ID de groupe de consommateurs Kafka

L’ID d’un groupe de consommateurs utilisé par le connecteur. Il peut être arbitraire mais doit être unique.

Oui

Mot de passe Kafka SASL

Mot de passe fourni avec le mot de passe configuré lors de l’utilisation du mécanisme SASL512 SCRAM

Nom d’utilisateur Kafka SASL

Nom d’utilisateur fourni avec un mot de passe configuré lors de l’utilisation deSASL512SCRAM

Format des sujets Kafka

Un des éléments suivants : noms / modèle. Indique si les sujets Kafka fournis sont une liste de noms séparés par des virgules ou une expression régulière unique.

Oui

Sujets Kafka

Une liste de sujets Kafka séparés par des virgules ou une expression régulière.

Oui

Base de données de destination Snowflake

La base de données dans laquelle les données sont conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Schéma de destination Snowflake

Le schéma dans lequel les données sont conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Voir l’exemple suivant :

CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name : utilisez SCHEMA_NAME.

CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" : utilisez schema_name ou SCHEMA_NAME, respectivement.

Oui

Table de destination Snowflake

La table dans laquelle les données sont conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Démarrer le connecteur

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Start. Le connecteur commence l’ingestion des données.

Comprendre la colonne KAFKAMETADATA

Le connecteur remplit la structure KAFKAMETADATA avec des métadonnées sur l’enregistrement Kafka. La structure contient les informations suivantes :

Champ

Type de données

Description

topic

Chaîne

Le nom du sujet Kafka d’où provient l’enregistrement.

partition

number

Le numéro de la partition dans le sujet. (Notez qu’il s’agit de la partition Kafka, pas de la micro-partition Snowflake.)

offset

number

Le décalage dans cette partition.

timestamp

number

Horodatage correspondant au moment où l’enregistrement a été ajouté à Kafka.

key

Chaîne

Si le message est un KeyedMessage Kafka, il s’agit de la clé de ce message. Pour que le connecteur enregistre la clé dans RECORD_METADATA, le paramètre key.converter dans les propriétés de la configuration Kafka doit être défini sur org.apache.kafka.connect.storage.StringConverter; sinon, le connecteur ignore les clés.

headers

Objet

Un en-tête est une paire clé-valeur définie par l’utilisateur associée à l’enregistrement. Chaque enregistrement peut avoir 0, 1 ou plusieurs en-têtes.

Mesure de la latence d’ingestion

Pour le suivi des modifications, le traitement incrémentiel et les requêtes Time Travel basées sur l’heure de modification des lignes, la fonctionnalité ROW_TIMESTAMP peut être utilisée.

Activez-la en exécutant la commande suivante sur votre table de destination :

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

Une fois les horodatages de lignes activés, les tables exposent la colonne METADATA$ROW_LAST_COMMIT_TIME, qui renvoie l’horodatage de la dernière modification de chaque ligne.

Pour plus d’informations, voir METADATA$ROW_LAST_COMMIT_TIME.

Note

L’horodatage de la ligne n’est pas disponible pour les tables interactives. Pour plus d’informations, voir Tables interactives et entrepôts interactifs Snowflake.

Utilisation du connecteur avec des tables Apache Iceberg™

Le connecteur peut ingérer des données dans des tables Apache Iceberg™ gérées par Snowflake, mais doit répondre aux exigences suivantes :

  • Vous devez avoir obtenu le privilège USAGE sur le volume externe associé à votre table Apache Iceberg™.

  • Vous devez créer une table Apache Iceberg™ avant d’exécuter le connecteur.

Accorder l’utilisation sur un volume externe

Par exemple, si votre table Iceberg utilise le volume externe kafka_external_volume et que le connecteur utilise le rôle openflow_kafka_connector_role, exécutez l’instruction suivante :

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE openflow_kafka_connector_role;

Créer une table Apache Iceberg™ pour l’ingestion

Le connecteur ne crée pas automatiquement des tables Iceberg et ne prend pas en charge l’évolution des schémas. Avant d’exécuter le connecteur, vous devez créer manuellement une table Iceberg.

Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg (notamment VARIANT) ou des types Snowflake compatibles </user-guide/tables-iceberg-data-types>.

Prenons l’exemple du message suivant :

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

Pour créer une table Iceberg pour le message d’exemple, utilisez les instructions suivantes :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kafkaMetadata OBJECT(
    topic STRING,
    partition INTEGER,
    offset INTEGER,
    key STRING,
    headers variant,
    timestamp INTEGER
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Utilisation du connecteur avec des tables interactives

Les tables interactives sont un type spécial de table Snowflake optimisée pour les requêtes à faible latence et à forte concurrence. Pour plus d’informations, voir Tables interactives et entrepôts interactifs Snowflake.

  1. Créez une table interactive :

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Considérations importantes :

  • Les tables interactives ont des limitations et des restrictions de requêtes spécifiques. Consultez Tables interactives et entrepôts interactifs Snowflake avant de les utiliser avec le connecteur.

  • Pour les tables interactives, toutes les transformations requises doivent être gérées dans la définition de la table.

  • Des entrepôts interactifs sont nécessaires pour interroger efficacement les tables interactives.

Utilisation du connecteur avec un schéma défini par le client pour la table de destination

Le connecteur traite chaque enregistrement Kafka comme une ligne à insérer dans une table Snowflake. Par exemple, si vous avez un sujet Kafka avec le contenu du message structuré comme le fichier JSON :

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

Par défaut, vous n’êtes pas obligé de spécifier tous les champs de JSON grâce à la fonctionnalité ENABLE_SCHEMA_EVOLUTION = TRUE. Toutefois, si vous préférez un schéma statique, il peut être créé en exécutant :

CREATE TABLE ORDERS (
  kafkaMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Utilisation du connecteur avec un PIPE défini par le client

Si vous choisissez de créer votre propre canal, vous pouvez définir la logique de transformation des données dans l’instruction COPY INTO du canal. Vous pouvez renommer les colonnes si nécessaire et convertir les types de données selon vos besoins. Par exemple :

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'));

Lorsque vous définissez votre propre canal, les colonnes de votre table de destination ne doivent pas nécessairement correspondre aux clés JSON. Vous pouvez renommer les colonnes à l’aide des noms de votre choix et convertir les types de données selon vos besoins.

Pour ajuster le connecteur afin qu’il fonctionne avec un canal personnalisé, effectuez les tâches suivantes :

  1. Faites un clic droit sur le processeur PublishSnowpipeStreaming utilisé dans votre flux d’ingestion Kafka dans le canevas Openflow.

  2. Sélectionnez Configurer dans le menu contextuel.

  3. Accédez à l’onglet Propriétés.

  4. Dans le champ Type de destination, sélectionnez Canal.

  5. Dans le champ Canal, tapez le nom de votre PIPE.

  6. Sélectionnez Appliquer pour enregistrer la configuration.

Personnalisation de la gestion des erreurs

Le traitement des erreurs est réparti entre les échecs côté Openflow et les échecs côté serveur au sein du service Snowpipe Streaming.

  • Erreurs Openflow (échecs côté client) : Des erreurs telles que des charges utiles impossibles à analyser ou des échecs de transformations personnalisées se produisent avant que les enregistrements n’atteignent Snowflake. Par défaut, ces enregistrements sont ignorés. Il est possible de traiter ces erreurs dans Openflow - utilisez FlowFiles de la relation d’échec d’analyse dans le processeur ConsumeKafka.

  • Erreurs Snowpipe Streaming (échecs côté serveur) : Les erreurs des enregistrements qui atteignent Snowflake, mais qui sont incompatibles avec le schéma de la table de destination (par exemple, les discordances de type), sont capturées par l’infrastructure Snowflake. Lorsque la journalisation des erreurs est activée sur la table de destination (error_logging = true), ces lignes qui ont échoué sont automatiquement ingérées dans la table d’erreur de destination.

Réglage des performances

Optimisation des performances du connecteur Openflow pour Kafka