Configuration des Openflow Connector for Amazon Kinesis Data Streams¶
Note
Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.
Cette rubrique explique comment configurer Openflow Connector for Amazon Kinesis Data Streams.
Openflow Connector for Amazon Kinesis Data Streams est conçu pour l’ingestion de messages JSON des flux Kinesis vers des tables Snowflake, avec des capacités d’évolution des schémas.
Paramétrage du connecteur Openflow pour Kinesis¶
Conditions préalables¶
Consulter Openflow Connector for Amazon Kinesis Data Streams
Assurez-vous d’avoir Configuration d’Openflow - BYOC ou de Configurer Openflow - Déploiements Snowflake.
Si vous utilisez OpenFlow - Déploiements Snowflake, assurez-vous d’avoir pris connaissance de la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.
Configurer les rôles et les stratégies IAM dans AWS¶
En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:
Créez un utilisateur ou rôle IAM AWS qu’Openflow utilisera pour accéder au flux de données Kinesis. Pour plus d’informations, voir Création d’utilisateurs IAM dans la documentation AWS.
Assurez-vous que l’utilisateur AWS a configuré les identifiants de connexion de la clé d’accès.
Accordez les autorisations IAM suivantes à l’utilisateur AWS :
Service
Actions
Ressources (ARNs)
But
Flux de données Amazon Kinesis
kinesis:DescribeStream,kinesis:DescribeStreamConsumer,kinesis:GetRecords,kinesis:GetShardIterator,kinesis:ListShards,kinesis:RegisterStreamConsumerarn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}Détecte les fragments, lit les enregistrements via une interrogation à débit partagé, résout le flux ARN, enregistre un consommateur Enhanced Fan-Out et interroge l’état du consommateur lors de l’enregistrement.
Flux de données Amazon Kinesis
kinesis:DeregisterStreamConsumer,kinesis:DescribeStreamConsumer,kinesis:SubscribeToShardarn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*Décrit, s’abonne à et désenregistre les consommateurs Enhanced Fan-Out par consommateur ARN.
Amazon DynamoDB
dynamodb:CreateTable,dynamodb:DeleteTable,dynamodb:DescribeTable,dynamodb:GetItem,dynamodb:PutItem,dynamodb:Query,dynamodb:Scan,dynamodb:UpdateItemarn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME},arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migrationCrée et gère la table des points de contrôle/baux (baux de fragments, battements de cœur des nœuds, points de contrôle) ainsi qu’une table de migration temporaire utilisée lors d’une migration ponctuelle à partir des anciennes tables de points de contrôle.
Exemple de stratégie IAM :
Avant d’utiliser l’exemple de stratégie, remplacez les caractères de remplacement suivants :
Caractère de remplacement
Description
${REGION}Votre région AWS (par exemple,
us-east-1)${ACCOUNT_ID}Votre ID de compte AWS (par exemple,
123456789012)${STREAM_NAME}La valeur du paramètre du connecteur AWS Kinesis Stream Name
${APPLICATION_NAME}La valeur du paramètre du connecteur AWS Kinesis Application Name Utilisée comme nom de la table de points de contrôle DynamoDB et comme nom du consommateur enregistré Enhanced Fan-Out.
Note
La table
${APPLICATION_NAME}_migrationest une table DynamoDB temporaire créée uniquement lors d’une migration unique des tables de point de contrôle héritées vers le nouveau schéma. Elle est automatiquement supprimée lorsque la migration est terminée. Si votre déploiement n’a jamais utilisé l’ancien connecteur basé sur KCL, vous pouvez omettre la table de migration ARN de la stratégie.L’action
dynamodb:DeleteTableest utilisée pendant le processus de migration et peut être supprimée de la stratégie une fois que la migration a été confirmée comme étant terminée.L’action
kinesis:DeregisterStreamConsumerest déclenchée lorsque le processeur est supprimé du canevas. Si l’entité IAM ne dispose pas de cette autorisation, le consommateur doit être désenregistré manuellement via la console AWS ou la CLI.
Paramétrage du compte Snowflake¶
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.
Créez un nouveau rôle ou utilisez un rôle existant et accordez les privilèges de base de données.
Le connecteur nécessite que l’utilisateur crée la table de destination. Assurez-vous que l’utilisateur dispose des privilèges nécessaires pour gérer les objets Snowflake :
Objet
Privilège
Remarques
Base de données
USAGE
Schéma
USAGE
Table
OWNERSHIP
Nécessaire pour que le connecteur ingère des données dans une table.
Snowflake recommande de créer un utilisateur et un rôle distincts pour chaque flux Kinesis afin d’assurer un meilleur contrôle des accès.
Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :
Note
Notez que les privilèges doivent être accordés directement au rôle de connecteur et ne peuvent pas être hérités.
Configurer la table de destination
Nous vous recommandons fortement d’utiliser l’évolution de schéma côté serveur pour les modifications de schéma et une table d’erreurs pour la journalisation des erreurs DML.
L’exemple ci-dessous montre comment créer une table et ajouter des autorisations OWNERSHIP.
Ces connecteurs prennent en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur. Les clés de premier niveau du contenu de l’enregistrement seront automatiquement mappées avec les colonnes de la table correspondant par leur nom (insensibles à la casse).
Lorsque l’évolution de schéma est activée, Snowflake peut automatiquement développer la table de destination en ajoutant de nouvelles colonnes détectées dans le flux entrant et en supprimant des contraintes NOT NULL pour prendre en charge de nouveaux modèles de données. Pour plus d’informations, consultez Évolution du schéma de la table.
Si ENABLE_SCHEMA_EVOLUTION n’est pas activée, vous devez créer le schéma manuellement en étendant la définition de la table. Le connecteur tente de faire correspondre les clés de premier niveau du contenu de l’enregistrement aux colonnes de la table par leur nom. Si les clés du fichier JSON ne correspondent pas aux colonnes de la table, le connecteur ignore les clés.
(Facultatif) Configurer un gestionnaire de secrets
Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.
Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associé à Openflow, car de cette manière, aucun autre secret ne doit être conservé.
Dans le canevas Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.
À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.
Octroyer l’accès aux utilisateurs
Tous les autres utilisateurs de Snowflake qui ont besoin d’accéder aux données brutes ingérées par le connecteur (par exemple, pour un traitement personnalisé dans Snowflake), doivent se voir attribuer le rôle créé à l’étape 2.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Accédez à la page d’aperçu d’Openflow. Dans la section Connecteurs à la une, sélectionnez Afficher plus de connecteurs.
Sur la page des connecteurs Openflow, recherchez le connecteur Openflow pour Amazon Kinesis Data Streams, puis sélectionnez Ajouter à l’environnement d’exécution.
Dans la boîte de dialogue Sélectionner un environnement d’exécution, sélectionnez votre environnement d’exécution dans la liste déroulante Environnements d’exécution disponibles, puis cliquez sur Ajouter.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données, un schéma et une table dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Autoriser lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Si nécessaire, personnalisez la configuration du connecteur avant de configurer les paramètres intégrés.
Renseigner les paramètres du groupe de processus
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Remplissez les valeurs de paramètres requises.
Paramètres communs¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
ID de clé d’accès AWS |
L’AWS de clé d’accès ID pour vous connecter à votre Kinesis Stream et DynamoDB. |
Oui |
Région Kinesis AWS |
La Région AWS à laquelle se connecter. Utilisez un format de région AWS ordinaire, par exemple : |
Oui |
Clé d’accès secrète AWS |
La clé d’accès secrète AWS pour se connecter à votre flux Kinesis et DynamoDB. |
Oui |
Nom de l’application Kinesis AWS |
Le nom utilisé comme nom de table DynamoDB pour suivre la progression de l’application en matière de consommation de flux Kinesis. |
Oui |
Type de consommateur Kinesis AWS |
La stratégie utilisée pour lire des enregistrements à partir d’un flux Kinesis. Doit être l’une des valeurs suivantes : SHARED_THROUGHPUT, ENHANCED_FAN_OUT. Pour plus d’informations, voir Différences entre le consommateur Shared Throughput et le consommateur Enhanced Fan-out. |
Oui |
Position initiale du flux Kinesis AWS |
Position initiale du flux à partir de laquelle les données commencent la réplication. Cela ne prend effet que lors du premier démarrage d’un Nom d’application Kinesis AWS donné. Les valeurs possibles sont les suivantes : LATEST : Dernier enregistrement stocké, TRIM_HORIZON : Enregistrement le plus ancien stocké. |
Oui |
Nom du flux Kinesis AWS |
Le nom du flux Kinesis AWS à partir duquel les données sont consommées. |
Oui |
Base de données de destination Snowflake |
La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Schéma de destination Snowflake |
Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. Voir l’exemple suivant :
|
Oui |
Table de destination Snowflake |
La table dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Démarrer le connecteur¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Cliquez avec le bouton droit sur le plan et sélectionnez Start. Le connecteur lance l’ingestion des données.
Comprendre la colonne KINESISMETADATA¶
Le connecteur remplit la structure KINESISMETADATA avec des métadonnées sur l’enregistrement Kinesis. La structure contient les informations suivantes :
Nom du champ |
Type de champ |
Exemple de valeur |
Description |
|---|---|---|---|
stream |
Chaîne |
|
Nom du flux Kinesis d’où provient l’enregistrement. |
shardId |
Chaîne |
|
L’identificateur du fragment dans le flux d’où provient l’enregistrement. |
approximateArrival |
Chaîne |
|
Heure approximative à laquelle l’enregistrement a été inséré dans le flux (format ISO 8601). |
partitionKey |
Chaîne |
|
La clé de partition spécifiée par le producteur de données pour l’enregistrement. |
sequenceNumber |
Chaîne |
|
Le numéro de séquence unique attribué par Kinesis Data Streams à l’enregistrement dans le fragment. |
subSequenceNumber |
Nombre |
|
Le numéro de sous-séquence de l’enregistrement (utilisé pour les enregistrements agrégés avec le même numéro de séquence). |
shardedSequenceNumber |
Chaîne |
|
Une combinaison du numéro de séquence et du numéro de sous-séquence de l’enregistrement. |
Mesure de la latence d’ingestion¶
Pour le suivi des modifications, le traitement incrémentiel et les requêtes Time Travel basées sur l’heure de modification des lignes, la fonctionnalité ROW_TIMESTAMP peut être utilisée.
Elle peut être activée en exécutant la commande suivante sur votre table de destination :
Une fois les horodatages de lignes activés, les tables exposent la colonne METADATA$ROW_LAST_COMMIT_TIME, qui renvoie l’horodatage de la dernière modification de chaque ligne.
Pour plus d’informations, voir Horodatages de ligne.
Note
L’horodatage de la ligne n’est pas disponible pour les tables interactives. Pour plus d’informations, voir Limitations des tables interactives.
Utilisation du connecteur avec des tables Apache Iceberg™¶
Le connecteur peut ingérer des données dans des tables Apache Iceberg™ gérées par Snowflake, mais doit répondre aux exigences suivantes :
Vous devez avoir obtenu le privilège USAGE sur le volume externe associé à votre table Apache Iceberg™.
Vous devez créer une table Apache Iceberg™ avant d’exécuter le connecteur.
Accorder l’utilisation sur un volume externe¶
Par exemple, si votre table Iceberg utilise le volume externe kinesis_external_volume et que le connecteur utilise le rôle openflow_kinesis_connector_role_1, exécutez l’instruction suivante :
Créer une table Apache Iceberg™ pour l’ingestion¶
Le connecteur ne crée pas automatiquement des tables Iceberg et ne prend pas en charge l’évolution des schémas. Avant d’exécuter le connecteur, vous devez créer manuellement une table Iceberg.
Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg (notamment VARIANT) ou des types Snowflake compatibles </user-guide/tables-iceberg-data-types>.
Prenons l’exemple du message suivant :
Pour créer une table Iceberg pour le message d’exemple, utilisez les instructions suivantes :
Utilisation du connecteur avec des tables interactives¶
Les tables interactives sont un type spécial de table Snowflake optimisée pour les requêtes à faible latence et à forte concurrence. Vous trouverez plus d’informations sur les tables interactives dans la documentation sur les tables interactives.
Créez une table interactive :
Considérations importantes :
Les tables interactives ont des limitations et des restrictions de requêtes spécifiques. Consultez la documentation sur les tables interactives avant de les utiliser avec le connecteur.
Pour les tables interactives, toutes les transformations requises doivent être gérées dans la définition de la table.
Des entrepôts interactifs sont nécessaires pour interroger efficacement les tables interactives.
Utilisation du connecteur avec un schéma défini par le client pour la table de destination¶
Le connecteur traite chaque enregistrement Kinesis comme une ligne à insérer dans une table Snowflake. Par exemple, si vous disposez d’un sujet Kinesis dont le contenu du message est structuré comme dans le JSON suivant :
Par défaut, vous n’êtes pas obligé de spécifier tous les champs du JSON. L’évolution du schéma s’en chargera. Toutefois, si vous préférez un schéma statique, il peut être créé en exécutant :
Utilisation du connecteur avec un PIPE défini par le client¶
Si vous choisissez de créer votre propre canal, vous pouvez définir la logique de transformation des données dans l’instruction COPY INTO du canal. Vous pouvez renommer les colonnes si nécessaire et convertir les types de données selon vos besoins. Par exemple :
Lorsque vous définissez votre propre canal, les colonnes de votre table de destination ne doivent pas nécessairement correspondre aux clés JSON. Vous pouvez renommer les colonnes à l’aide des noms de votre choix et convertir les types de données selon vos besoins.
Pour ajuster le connecteur afin qu’il fonctionne avec un canal personnalisé, effectuez les tâches suivantes :
Faites un clic droit sur le processeur PublishSnowpipeStreaming utilisé dans votre flux d’ingestion Kinesis dans le canevas Openflow.
Sélectionnez Configure dans le menu contextuel.
Accédez à l’onglet Properties.
Dans le champ Type de destination, sélectionnez Pipe.
Dans le champ Canal, saisissez le nom de votre canal.
Sélectionnez Apply pour enregistrer la configuration.
Personnalisation de la gestion des erreurs¶
Le traitement des erreurs est réparti entre les échecs côté Openflow et les échecs côté serveur au sein du service Snowpipe Streaming.
Erreurs Openflow (échecs côté client) : Des erreurs telles que des charges utiles impossibles à analyser ou des échecs de transformations personnalisées se produisent avant que les enregistrements n’atteignent Snowflake. Par défaut, ces enregistrements sont ignorés. Il est possible de traiter ces erreurs dans Openflow - utilisez FlowFiles de la relation d’échec d’analyse dans le processeur ConsumeKinesis.
Erreurs Snowpipe Streaming (échecs côté serveur) : Les erreurs des enregistrements qui atteignent Snowflake, mais qui sont incompatibles avec le schéma de la table de destination (par exemple, les discordances de type), sont capturées par l’infrastructure Snowflake. Lorsque la journalisation des erreurs est activée sur la table de destination (
error_logging = true), ces lignes qui ont échoué sont automatiquement ingérées dans la table d’erreur de destination.