Configuration des Openflow Connector for Amazon Kinesis Data Streams

Note

Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.

Cette rubrique explique comment configurer Openflow Connector for Amazon Kinesis Data Streams.

Openflow Connector for Amazon Kinesis Data Streams est conçu pour l’ingestion de messages JSON des flux Kinesis vers des tables Snowflake, avec des capacités d’évolution des schémas.

Paramétrage du connecteur Openflow pour Kinesis

Conditions préalables

  1. Consulter Openflow Connector for Amazon Kinesis Data Streams

  2. Assurez-vous d’avoir Configuration d’Openflow - BYOC ou de Configurer Openflow - Déploiements Snowflake.

  3. Si vous utilisez OpenFlow - Déploiements Snowflake, assurez-vous d’avoir pris connaissance de la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.

Configurer les rôles et les stratégies IAM dans AWS

En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:

  1. Créez un utilisateur ou rôle IAM AWS qu’Openflow utilisera pour accéder au flux de données Kinesis. Pour plus d’informations, voir Création d’utilisateurs IAM dans la documentation AWS.

  2. Assurez-vous que l’utilisateur AWS a configuré les identifiants de connexion de la clé d’accès.

  3. Accordez les autorisations IAM suivantes à l’utilisateur AWS :

    Service

    Actions

    Ressources (ARNs)

    But

    Flux de données Amazon Kinesis

    kinesis:DescribeStream, kinesis:DescribeStreamConsumer, kinesis:GetRecords, kinesis:GetShardIterator, kinesis:ListShards, kinesis:RegisterStreamConsumer

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}

    Détecte les fragments, lit les enregistrements via une interrogation à débit partagé, résout le flux ARN, enregistre un consommateur Enhanced Fan-Out et interroge l’état du consommateur lors de l’enregistrement.

    Flux de données Amazon Kinesis

    kinesis:DeregisterStreamConsumer, kinesis:DescribeStreamConsumer, kinesis:SubscribeToShard

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*

    Décrit, s’abonne à et désenregistre les consommateurs Enhanced Fan-Out par consommateur ARN.

    Amazon DynamoDB

    dynamodb:CreateTable, dynamodb:DeleteTable, dynamodb:DescribeTable, dynamodb:GetItem, dynamodb:PutItem, dynamodb:Query, dynamodb:Scan, dynamodb:UpdateItem

    arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}, arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration

    Crée et gère la table des points de contrôle/baux (baux de fragments, battements de cœur des nœuds, points de contrôle) ainsi qu’une table de migration temporaire utilisée lors d’une migration ponctuelle à partir des anciennes tables de points de contrôle.

    Exemple de stratégie IAM :

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "KinesisStreamAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:RegisterStreamConsumer"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}"
            },
            {
                "Sid": "KinesisConsumerAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DeregisterStreamConsumer",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*"
            },
            {
                "Sid": "DynamoDBTableAccess",
                "Effect": "Allow",
                "Action": [
                    "dynamodb:CreateTable",
                    "dynamodb:DeleteTable",
                    "dynamodb:DescribeTable",
                    "dynamodb:GetItem",
                    "dynamodb:PutItem",
                    "dynamodb:Query",
                    "dynamodb:Scan",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}",
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration"
                ]
            }
        ]
    }
    

    Avant d’utiliser l’exemple de stratégie, remplacez les caractères de remplacement suivants :

    Caractère de remplacement

    Description

    ${REGION}

    Votre région AWS (par exemple, us-east-1)

    ${ACCOUNT_ID}

    Votre ID de compte AWS (par exemple, 123456789012)

    ${STREAM_NAME}

    La valeur du paramètre du connecteur AWS Kinesis Stream Name

    ${APPLICATION_NAME}

    La valeur du paramètre du connecteur AWS Kinesis Application Name Utilisée comme nom de la table de points de contrôle DynamoDB et comme nom du consommateur enregistré Enhanced Fan-Out.

    Note

    • La table ${APPLICATION_NAME}_migration est une table DynamoDB temporaire créée uniquement lors d’une migration unique des tables de point de contrôle héritées vers le nouveau schéma. Elle est automatiquement supprimée lorsque la migration est terminée. Si votre déploiement n’a jamais utilisé l’ancien connecteur basé sur KCL, vous pouvez omettre la table de migration ARN de la stratégie.

    • L’action dynamodb:DeleteTable est utilisée pendant le processus de migration et peut être supprimée de la stratégie une fois que la migration a été confirmée comme étant terminée.

    • L’action kinesis:DeregisterStreamConsumer est déclenchée lorsque le processeur est supprimé du canevas. Si l’entité IAM ne dispose pas de cette autorisation, le consommateur doit être désenregistré manuellement via la console AWS ou la CLI.

Paramétrage du compte Snowflake

En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

  1. Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.

  2. Créez un nouveau rôle ou utilisez un rôle existant et accordez les privilèges de base de données.

    Le connecteur nécessite que l’utilisateur crée la table de destination. Assurez-vous que l’utilisateur dispose des privilèges nécessaires pour gérer les objets Snowflake :

    Objet

    Privilège

    Remarques

    Base de données

    USAGE

    Schéma

    USAGE

    Table

    OWNERSHIP

    Nécessaire pour que le connecteur ingère des données dans une table.

    Snowflake recommande de créer un utilisateur et un rôle distincts pour chaque flux Kinesis afin d’assurer un meilleur contrôle des accès.

    Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :

    USE ROLE securityadmin;
    
    CREATE ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON DATABASE kinesis_db TO ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON SCHEMA kinesis_schema TO ROLE openflow_kinesis_connector_role_1;
    

    Note

    Notez que les privilèges doivent être accordés directement au rôle de connecteur et ne peuvent pas être hérités.

  3. Configurer la table de destination

    Nous vous recommandons fortement d’utiliser l’évolution de schéma côté serveur pour les modifications de schéma et une table d’erreurs pour la journalisation des erreurs DML.

    L’exemple ci-dessous montre comment créer une table et ajouter des autorisations OWNERSHIP.

    USE ROLE openflow_kinesis_connector_role_1;
    
    CREATE TABLE kinesis_db.kinesis_schema.<DESTINATION_TABLE_NAME> (
      kinesisMetadata object
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE <DESTINATION_TABLE_NAME> TO ROLE openflow_kinesis_connector_role_1;
    

    Ces connecteurs prennent en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur. Les clés de premier niveau du contenu de l’enregistrement seront automatiquement mappées avec les colonnes de la table correspondant par leur nom (insensibles à la casse).

    Lorsque l’évolution de schéma est activée, Snowflake peut automatiquement développer la table de destination en ajoutant de nouvelles colonnes détectées dans le flux entrant et en supprimant des contraintes NOT NULL pour prendre en charge de nouveaux modèles de données. Pour plus d’informations, consultez Évolution du schéma de la table.

    Si ENABLE_SCHEMA_EVOLUTION n’est pas activée, vous devez créer le schéma manuellement en étendant la définition de la table. Le connecteur tente de faire correspondre les clés de premier niveau du contenu de l’enregistrement aux colonnes de la table par leur nom. Si les clés du fichier JSON ne correspondent pas aux colonnes de la table, le connecteur ignore les clés.

  4. (Facultatif) Configurer un gestionnaire de secrets

    Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.

    1. Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associé à Openflow, car de cette manière, aucun autre secret ne doit être conservé.

    2. Dans le canevas Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.

    3. À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.

  5. Octroyer l’accès aux utilisateurs

    Tous les autres utilisateurs de Snowflake qui ont besoin d’accéder aux données brutes ingérées par le connecteur (par exemple, pour un traitement personnalisé dans Snowflake), doivent se voir attribuer le rôle créé à l’étape 2.

Définir le connecteur

En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :

Installer le connecteur

  1. Accédez à la page d’aperçu d’Openflow. Dans la section Connecteurs à la une, sélectionnez Afficher plus de connecteurs.

  2. Sur la page des connecteurs Openflow, recherchez le connecteur Openflow pour Amazon Kinesis Data Streams, puis sélectionnez Ajouter à l’environnement d’exécution.

  3. Dans la boîte de dialogue Sélectionner un environnement d’exécution, sélectionnez votre environnement d’exécution dans la liste déroulante Environnements d’exécution disponibles, puis cliquez sur Ajouter.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données, un schéma et une table dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  4. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Autoriser lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  5. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

    Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

  1. Si nécessaire, personnalisez la configuration du connecteur avant de configurer les paramètres intégrés.

  2. Renseigner les paramètres du groupe de processus

    1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

    2. Remplissez les valeurs de paramètres requises.

Paramètres communs

Paramètre

Description

Obligatoire

ID de clé d’accès AWS

L’AWS de clé d’accès ID pour vous connecter à votre Kinesis Stream et DynamoDB.

Oui

Région Kinesis AWS

La Région AWS à laquelle se connecter. Utilisez un format de région AWS ordinaire, par exemple : us-west-2, ap-southeast-1, eu-west-1. Consultez la page Régions AWS.

Oui

Clé d’accès secrète AWS

La clé d’accès secrète AWS pour se connecter à votre flux Kinesis et DynamoDB.

Oui

Nom de l’application Kinesis AWS

Le nom utilisé comme nom de table DynamoDB pour suivre la progression de l’application en matière de consommation de flux Kinesis.

Oui

Type de consommateur Kinesis AWS

La stratégie utilisée pour lire des enregistrements à partir d’un flux Kinesis.

Doit être l’une des valeurs suivantes : SHARED_THROUGHPUT, ENHANCED_FAN_OUT.

Pour plus d’informations, voir Différences entre le consommateur Shared Throughput et le consommateur Enhanced Fan-out.

Oui

Position initiale du flux Kinesis AWS

Position initiale du flux à partir de laquelle les données commencent la réplication. Cela ne prend effet que lors du premier démarrage d’un Nom d’application Kinesis AWS donné.

Les valeurs possibles sont les suivantes :

LATEST : Dernier enregistrement stocké,

TRIM_HORIZON : Enregistrement le plus ancien stocké.

Oui

Nom du flux Kinesis AWS

Le nom du flux Kinesis AWS à partir duquel les données sont consommées.

Oui

Base de données de destination Snowflake

La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Schéma de destination Snowflake

Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Voir l’exemple suivant :

CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name : utilisez SCHEMA_NAME.

CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" : utilisez schema_name ou SCHEMA_NAME, respectivement.

Oui

Table de destination Snowflake

La table dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Démarrer le connecteur

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Cliquez avec le bouton droit sur le plan et sélectionnez Start. Le connecteur lance l’ingestion des données.

Comprendre la colonne KINESISMETADATA

Le connecteur remplit la structure KINESISMETADATA avec des métadonnées sur l’enregistrement Kinesis. La structure contient les informations suivantes :

Nom du champ

Type de champ

Exemple de valeur

Description

stream

Chaîne

stream-name

Nom du flux Kinesis d’où provient l’enregistrement.

shardId

Chaîne

shardId-000000000001

L’identificateur du fragment dans le flux d’où provient l’enregistrement.

approximateArrival

Chaîne

2025-11-05T09:12:15.300

Heure approximative à laquelle l’enregistrement a été inséré dans le flux (format ISO 8601).

partitionKey

Chaîne

key-1234

La clé de partition spécifiée par le producteur de données pour l’enregistrement.

sequenceNumber

Chaîne

123456789

Le numéro de séquence unique attribué par Kinesis Data Streams à l’enregistrement dans le fragment.

subSequenceNumber

Nombre

2

Le numéro de sous-séquence de l’enregistrement (utilisé pour les enregistrements agrégés avec le même numéro de séquence).

shardedSequenceNumber

Chaîne

12345678900002

Une combinaison du numéro de séquence et du numéro de sous-séquence de l’enregistrement.

Mesure de la latence d’ingestion

Pour le suivi des modifications, le traitement incrémentiel et les requêtes Time Travel basées sur l’heure de modification des lignes, la fonctionnalité ROW_TIMESTAMP peut être utilisée.

Elle peut être activée en exécutant la commande suivante sur votre table de destination :

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

Une fois les horodatages de lignes activés, les tables exposent la colonne METADATA$ROW_LAST_COMMIT_TIME, qui renvoie l’horodatage de la dernière modification de chaque ligne.

Pour plus d’informations, voir Horodatages de ligne.

Note

L’horodatage de la ligne n’est pas disponible pour les tables interactives. Pour plus d’informations, voir Limitations des tables interactives.

Utilisation du connecteur avec des tables Apache Iceberg™

Le connecteur peut ingérer des données dans des tables Apache Iceberg™ gérées par Snowflake, mais doit répondre aux exigences suivantes :

  • Vous devez avoir obtenu le privilège USAGE sur le volume externe associé à votre table Apache Iceberg™.

  • Vous devez créer une table Apache Iceberg™ avant d’exécuter le connecteur.

Accorder l’utilisation sur un volume externe

Par exemple, si votre table Iceberg utilise le volume externe kinesis_external_volume et que le connecteur utilise le rôle openflow_kinesis_connector_role_1, exécutez l’instruction suivante :

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kinesis_external_volume TO ROLE openflow_kinesis_connector_role_1;

Créer une table Apache Iceberg™ pour l’ingestion

Le connecteur ne crée pas automatiquement des tables Iceberg et ne prend pas en charge l’évolution des schémas. Avant d’exécuter le connecteur, vous devez créer manuellement une table Iceberg.

Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg (notamment VARIANT) ou des types Snowflake compatibles </user-guide/tables-iceberg-data-types>.

Prenons l’exemple du message suivant :

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

Pour créer une table Iceberg pour le message d’exemple, utilisez les instructions suivantes :

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kinesisMetadata OBJECT(
    stream STRING,
    shardId STRING,
    approximateArrival STRING,
    partitionKey STRING,
    sequenceNumber STRING,
    subSequenceNumber INTEGER,
    shardedSequenceNumber STRING
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Utilisation du connecteur avec des tables interactives

Les tables interactives sont un type spécial de table Snowflake optimisée pour les requêtes à faible latence et à forte concurrence. Vous trouverez plus d’informations sur les tables interactives dans la documentation sur les tables interactives.

  1. Créez une table interactive :

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) CLUSTER BY (metric_name)
    AS (SELECT
      $1:M_NAME::VARCHAR,
      $1:M_VALUE::NUMBER,
      $1:RECORD_METADATA.topic::VARCHAR,
      $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
    from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Considérations importantes :

  • Les tables interactives ont des limitations et des restrictions de requêtes spécifiques. Consultez la documentation sur les tables interactives avant de les utiliser avec le connecteur.

  • Pour les tables interactives, toutes les transformations requises doivent être gérées dans la définition de la table.

  • Des entrepôts interactifs sont nécessaires pour interroger efficacement les tables interactives.

Utilisation du connecteur avec un schéma défini par le client pour la table de destination

Le connecteur traite chaque enregistrement Kinesis comme une ligne à insérer dans une table Snowflake. Par exemple, si vous disposez d’un sujet Kinesis dont le contenu du message est structuré comme dans le JSON suivant :

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

Par défaut, vous n’êtes pas obligé de spécifier tous les champs du JSON. L’évolution du schéma s’en chargera. Toutefois, si vous préférez un schéma statique, il peut être créé en exécutant :

CREATE TABLE ORDERS (
  kinesisMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Utilisation du connecteur avec un PIPE défini par le client

Si vous choisissez de créer votre propre canal, vous pouvez définir la logique de transformation des données dans l’instruction COPY INTO du canal. Vous pouvez renommer les colonnes si nécessaire et convertir les types de données selon vos besoins. Par exemple :

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::STRING,
    $1:customer_name,
    $1:order_total::STRING,
    $1:isPaid::STRING
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

Lorsque vous définissez votre propre canal, les colonnes de votre table de destination ne doivent pas nécessairement correspondre aux clés JSON. Vous pouvez renommer les colonnes à l’aide des noms de votre choix et convertir les types de données selon vos besoins.

Pour ajuster le connecteur afin qu’il fonctionne avec un canal personnalisé, effectuez les tâches suivantes :

  1. Faites un clic droit sur le processeur PublishSnowpipeStreaming utilisé dans votre flux d’ingestion Kinesis dans le canevas Openflow.

  2. Sélectionnez Configure dans le menu contextuel.

  3. Accédez à l’onglet Properties.

  4. Dans le champ Type de destination, sélectionnez Pipe.

  5. Dans le champ Canal, saisissez le nom de votre canal.

  6. Sélectionnez Apply pour enregistrer la configuration.

Personnalisation de la gestion des erreurs

Le traitement des erreurs est réparti entre les échecs côté Openflow et les échecs côté serveur au sein du service Snowpipe Streaming.

  • Erreurs Openflow (échecs côté client) : Des erreurs telles que des charges utiles impossibles à analyser ou des échecs de transformations personnalisées se produisent avant que les enregistrements n’atteignent Snowflake. Par défaut, ces enregistrements sont ignorés. Il est possible de traiter ces erreurs dans Openflow - utilisez FlowFiles de la relation d’échec d’analyse dans le processeur ConsumeKinesis.

  • Erreurs Snowpipe Streaming (échecs côté serveur) : Les erreurs des enregistrements qui atteignent Snowflake, mais qui sont incompatibles avec le schéma de la table de destination (par exemple, les discordances de type), sont capturées par l’infrastructure Snowflake. Lorsque la journalisation des erreurs est activée sur la table de destination (error_logging = true), ces lignes qui ont échoué sont automatiquement ingérées dans la table d’erreur de destination.

Prochaines étapes