Configurer Openflow Connector for Kinesis pour le format de données JSON

Note

Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.

Ce chapitre décrit comment configurer Openflow Connector for Kinesis pour le format de données JSON. Il s’agit d’un connecteur simplifié, optimisé pour l’ingestion de messages de base avec des capacités d’évolution des schémas.

Le Openflow Connector for Kinesis pour le format de données JSON est conçu pour l’ingestion de messages JSON simples des flux Kinesis vers des tables Snowflake.

Conditions préalables

  1. Consulter À propos de Openflow Connector for Kinesis

  2. Assurez-vous d’avoir Configurer Openflow avec BYOC ou Configurer Openflow avec les déploiements Snowflake.

  3. Si vous utilisez Openflow - Snowflake Deployments, assurez-vous d’avoir consulté la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.

Note

Si vous avez besoin de la prise en charge d’autres formats de données ou de fonctionnalités, comme DLQ, contactez votre représentant Snowflake.

Configurez un flux Kinesis

En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:

  1. Assurez-vous que vous disposez d’un utilisateur AWS avec des autorisations IAM d’accès à Kinesis Streams et DynamoDB.

  2. Assurez-vous que l’utilisateur AWS a configuré les identifiants de connexion de la clé d’accès.

Paramétrage du compte Snowflake

En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

  1. Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.

  2. Créez une base de données de destination et un schéma de destination qui serviront à créer les tables de destination pour le stockage des données.

    1. Si vous planifiez d’utiliser la capacité du connecteur à créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges requis pour la création et la gestion d’objets Snowflake :

      Objet

      Privilège

      Remarques

      Base de données

      USAGE

      Schéma

      USAGE . CREATE TABLE .

      Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE object peuvent être révoqués.

      Table

      OWNERSHIP

      Uniquement requis lorsque vous utilisez le connecteur Kinesis pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du flux Kinesis, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.

      Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.

  4. Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. Configurez avec l’authentification par paire de clés pour l’utilisateur de Snowflake SERVICE de l’étape 3.

  6. Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.

    Note

    Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.

    1. Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associé à Openflow, car de cette manière, aucun autre secret ne doit être conservé.

    2. Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.

    3. À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.

  7. Si d’autres utilisateurs de Snowflake ont besoin d’accéder aux données ingérées et aux tables créées (par exemple, pour un traitement personnalisé dans Snowflake), accordez à ces utilisateurs le rôle créé à l’étape 2.

Définir le connecteur

En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :

Installer le connecteur

  1. Accédez à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes, puis cliquez sur Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  4. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  5. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

  1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

  2. Renseignez les valeurs des paramètres requis comme décrit dans Paramètres.

Paramètres

Cette section décrit tous les paramètres de Openflow Connector for Kinesis pour le format de données JSON.

Le connecteur se compose de plusieurs modules. Pour voir l’ensemble, double-cliquez sur le groupe de processus du connecteur. Vous pouvez définir les paramètres de chaque module dans le contexte de paramètres du module.

Paramètres de la destination Snowflake

Paramètre

Description

Obligatoire

Base de données de destination

La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Schéma de destination

Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Voir l’exemple suivant :

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name : utilisez SCHEMA_NAME.

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" : utilisez schema_name ou SCHEMA_NAME, respectivement.

Oui

Iceberg activé

Si Iceberg est activé pour les opérations de table. Une des valeurs true / false.

Oui

Évolution du schéma activée

Active ou désactive l’évolution du schéma au niveau du connecteur. Lorsque l’option est activée, autorise les modifications automatiques du schéma pour les tables. Notez que l’évolution des schémas peut également être contrôlée au niveau des tables individuelles par le biais de paramètres spécifiques aux tables. Une des options suivantes : true / false.

Oui

Évolution du schéma pour les nouvelles tables activées

Contrôle si l’évolution du schéma est activée lors de la création de nouvelles tables. Si la valeur est définie sur “true”, de nouvelles tables seront créées avec le paramètre ENABLE_SCHEMA_EVOLUTION = TRUE. Si défini sur “false”, de nouvelles tables seront créées avec le paramètre ENABLE_SCHEMA_EVOLUTION = FALSE. Ne s’applique pas aux tables Iceberg, car elles ne sont pas créées automatiquement. Ce paramètre affecte uniquement la création de tables, pas les tables existantes. Une des options suivantes : true / false.

Oui

Identificateur de compte Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : nom du compte Snowflake au format [nom-organisation]-[nom-compte] où les données seront conservées.

Oui

Stratégie d’authentification Snowflake

Lorsque vous utilisez :

  • Déploiement Snowflake Openflow ou BYOC : Utilisez SNOWFLAKE_MANAGED_TOKEN. Ce jeton est géré automatiquement par Snowflake. Les déploiements BYOC doivent disposer de rôles d’exécution pour utiliser SNOWFLAKE_MANAGED_TOKEN.

  • BYOC : BYOC peut également utiliser KEY_PAIR comme valeur pour la stratégie d’authentification.

Oui

Clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : Doit correspondre à la clé privée RSA utilisée pour l’authentification.

    La clé RSA doit être formatée conformément aux normes PKCS8 et posséder des en-têtes et des pieds de page PEM standards. Notez qu’un fichier de clé privée Snowflake ou une clé privée Snowflake doit être défini.

Non

Fichier de clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : Le fichier de la clé privée doit être vide.

  • KEY_PAIR : Chargez le fichier qui contient la clé privée RSA utilisée pour l’authentification auprès de Snowflake, formatée conformément aux normes PKCS8 et possédant des en-têtes et des pieds de page PEM standards. La ligne d’en-tête commence par -----BEGIN PRIVATE. Pour charger le fichier de la clé privée, cochez la case Reference asset.

Non

Mot de passe de la clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : fournissez le mot de passe associé au fichier de la clé privée Snowflake.

Non

Rôle Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : Utilisez votre rôle Snowflake. Vous pouvez trouver votre rôle Snowflake dans l’UI d’Openflow, en naviguant jusqu’à View Details pour votre exécution.

  • Stratégie d’authentification KEY_PAIR : Utilisez un rôle valide configuré pour votre utilisateur de service.

Oui

Nom d’utilisateur Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : indiquez le nom d’utilisateur utilisé pour vous connecter à l’instance Snowflake.

Oui

Paramètres de la source JSON Kinesis

Paramètre

Description

Obligatoire

Code de la région AWS

La région AWS où se trouve votre flux Kinesis, par exemple us-west-2.

Oui

ID de clé d’accès AWS

L’ID de la clé d’accès AWS pour vous connecter à votre flux Kinesis DynamoDB et, en option, à CloudWatch.

Oui

Clé d’accès secrète AWS

La clé d’accès secrète AWS pour se connecter à votre flux Kinesis DynamoDB et, en option, à CloudWatch.

Oui

Nom de l’application Kinesis

Le nom qui est utilisé pour le nom de la table DynamoDB pour le suivi de la progression de l’application sur la consommation de flux Kinesis.

Oui

Type de consommateur Kinesis

La stratégie utilisée pour lire des enregistrements à partir d’un flux Kinesis. Doit être l’une des valeurs suivantes : SHARED_THROUGHPUT ou ENHANCED_FAN_OUT. Pour plus d’informations, voir Développer des consommateurs du mode de ventilation amélioré.

Oui

Position initiale du flux Kinesis

Position initiale du flux à partir de laquelle les données commencent la réplication.

Les valeurs possibles sont les suivantes :

  • LATEST: Dernier enregistrement stocké

  • TRIM_HORIZON: Enregistrement le plus ancien stocké

Oui

Nom du flux Kinesis

Nom du flux Kinesis AWS à partir duquel les données sont consommées.

Oui

Publication de métriques

Spécifie où les métriques de la bibliothèque client Kinesis sont publiées. Valeurs possibles : DISABLED, LOGS, CLOUDWATCH.

Oui

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur le Plan et sélectionnez Enable all Controller Services.

  2. Cliquez à droite sur le groupe de processus du connecteur et sélectionnez Start.

Le connecteur démarre l’ingestion des données.

Schéma de table

La table Snowflake chargée par le connecteur contient des colonnes nommées par les clés de vos messages Kinesis. Le connecteur ajoute également une colonne KINESISMETADATA qui stocke les métadonnées sur l’enregistrement.

Vous trouverez ci-dessous un exemple de table Snowflake chargée par le connecteur :

Ligne

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … KINESISMETADATA object … }

2

XYZ789

ZABZX

SELL

3024

{ … KINESISMETADATA object … }

3

XYZ789

ZTEST

SELL

799

{ … KINESISMETADATA object … }

4

ABC123

ZABZX

BUY

2033

{ … KINESISMETADATA object … }

La colonne KINESISMETADATA contient un objet avec les champs suivants :

Nom du champ

Type de champ

Exemple de valeur

Description

stream

Chaîne

stream-name

Nom du flux Kinesis d’où provient l’enregistrement.

shardId

Chaîne

shardId-000000000001

L’identificateur du fragment dans le flux d’où provient l’enregistrement.

approximateArrival

Chaîne

2025-11-05T09:12:15.300

Heure approximative à laquelle l’enregistrement a été inséré dans le flux (format ISO 8601).

partitionKey

Chaîne

key-1234

La clé de partition spécifiée par le producteur de données pour l’enregistrement.

sequenceNumber

Chaîne

123456789

Le numéro de séquence unique attribué par Kinesis Data Streams à l’enregistrement dans le fragment.

subSequenceNumber

Nombre

2

Le numéro de sous-séquence de l’enregistrement (utilisé pour les enregistrements agrégés avec le même numéro de séquence).

shardedSequenceNumber

Chaîne

12345678900002

Une combinaison du numéro de séquence et du numéro de sous-séquence de l’enregistrement.

Évolution du schéma

Ce connecteur prend en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur.

Snowflake détecte le schéma des données entrantes et charge les données dans des tables qui correspondent à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.

La détection des schémas avec le connecteur déduit les types de données sur la base des données JSON fournies.

Si le connecteur crée la table cible, l’évolution du schéma est activée par défaut.

Si vous souhaitez activer ou désactiver l’évolution du schéma sur la table existante, utilisez la commande ALTER TABLE afin de définir le paramètre ENABLE_SCHEMA_EVOLUTION. Vous devez également utiliser un rôle qui a le privilège OWNERSHIP sur la table. Pour plus d’informations, voir Évolution du schéma de table.

Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur essaie d’envoyer les lignes dont les schémas ne correspondent pas au port de sortie en échec configuré.

Prise en charge des tables Iceberg

Openflow Connector for Kinesis peut ingérer des données dans une Table Apache Iceberg™ gérée par Snowflake lorsque Iceberg activé est défini sur true.

Exigences et limitations

Avant de configurer Openflow Connector for Kinesis pour l’ingestion de tables Iceberg, notez les exigences et limites suivantes :

  • Vous devez créer une table Iceberg avant d’exécuter le connecteur.

  • Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.

Configuration et définition

Pour configurer Openflow Connector for Kinesis pour l’ingestion de tables Iceberg, suivez les étapes de Configurer Openflow Connector for Kinesis pour le format de données JSON avec quelques différences notées dans les sections suivantes.

Activer l’ingestion dans la table Iceberg

Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled la valeur true.

Créer une table Iceberg pour l’ingestion

Avant d’exécuter le connecteur, vous devez créer une table Iceberg. Le schéma de table initial dépend des paramètres de propriété Évolution du schéma activée de votre connecteur.

Lorsque l’évolution du schéma est activée, vous devez créer une table avec une colonne nommée kinesisMetadata. Le connecteur crée automatiquement les colonnes des champs de message et modifie le schéma de colonnes kinesisMetadata.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

L’évolution du schéma est désactivée, vous devez créer la table avec tous les champs que le message Kinesis contient. Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg ou des types Snowflake compatibles. Le type semi-structuré VARIANT n’est pas pris en charge. Utilisez plutôt un OBJECT ou MAP structuré.

Prenons l’exemple du message suivant :

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

L’instruction suivante crée une table avec tous les champs que contient le message Kinesis :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Note

kinesisMetadata doit toujours être créé. Les noms de champ à l’intérieur de structures imbriquées telles que dogs ou cats sont sensibles à la casse.