Configurer Openflow Connector for Kinesis pour le format de données JSON¶
Note
Ce connecteur est soumis aux conditions d’utilisation de Snowflake Connector.
Ce chapitre décrit comment configurer Openflow Connector for Kinesis pour le format de données JSON. Il s’agit d’un connecteur simplifié, optimisé pour l’ingestion de messages de base avec des capacités d’évolution des schémas.
Le Openflow Connector for Kinesis pour le format de données JSON est conçu pour l’ingestion de messages JSON simples des flux Kinesis vers des tables Snowflake.
Conditions préalables¶
Assurez-vous d’avoir Configurer Openflow avec BYOC ou Configurer Openflow avec les déploiements Snowflake.
Si vous utilisez Openflow - Snowflake Deployments, assurez-vous d’avoir consulté la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.
Note
Si vous avez besoin de la prise en charge d’autres formats de données ou de fonctionnalités, comme DLQ, contactez votre représentant Snowflake.
Configurez un flux Kinesis¶
En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:
Assurez-vous que vous disposez d’un utilisateur AWS avec des autorisations IAM d’accès à Kinesis Streams et DynamoDB.
Assurez-vous que l’utilisateur AWS a configuré les identifiants de connexion de la clé d’accès.
Paramétrage du compte Snowflake¶
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.
Créez une base de données de destination et un schéma de destination qui serviront à créer les tables de destination pour le stockage des données.
Si vous planifiez d’utiliser la capacité du connecteur à créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges requis pour la création et la gestion d’objets Snowflake :
Objet
Privilège
Remarques
Base de données
USAGE
Schéma
USAGE . CREATE TABLE .
Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE
objectpeuvent être révoqués.Table
OWNERSHIP
Uniquement requis lorsque vous utilisez le connecteur Kinesis pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du flux Kinesis, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.
Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.
Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Configurez avec l’authentification par paire de clés pour l’utilisateur de Snowflake SERVICE de l’étape 3.
Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.
Note
Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.
Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associé à Openflow, car de cette manière, aucun autre secret ne doit être conservé.
Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.
À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.
Si d’autres utilisateurs de Snowflake ont besoin d’accéder aux données ingérées et aux tables créées (par exemple, pour un traitement personnalisé dans Snowflake), accordez à ces utilisateurs le rôle créé à l’étape 2.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Accédez à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.
Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes, puis cliquez sur Add.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Renseignez les valeurs des paramètres requis comme décrit dans Paramètres.
Paramètres¶
Cette section décrit tous les paramètres de Openflow Connector for Kinesis pour le format de données JSON.
Le connecteur se compose de plusieurs modules. Pour voir l’ensemble, double-cliquez sur le groupe de processus du connecteur. Vous pouvez définir les paramètres de chaque module dans le contexte de paramètres du module.
Paramètres de la destination Snowflake¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
Base de données de destination |
La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Schéma de destination |
Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. Voir l’exemple suivant :
|
Oui |
Iceberg activé |
Si Iceberg est activé pour les opérations de table. Une des valeurs |
Oui |
Évolution du schéma activée |
Active ou désactive l’évolution du schéma au niveau du connecteur. Lorsque l’option est activée, autorise les modifications automatiques du schéma pour les tables. Notez que l’évolution des schémas peut également être contrôlée au niveau des tables individuelles par le biais de paramètres spécifiques aux tables. Une des options suivantes : |
Oui |
Évolution du schéma pour les nouvelles tables activées |
Contrôle si l’évolution du schéma est activée lors de la création de nouvelles tables. Si la valeur est définie sur “true”, de nouvelles tables seront créées avec le paramètre ENABLE_SCHEMA_EVOLUTION = TRUE. Si défini sur “false”, de nouvelles tables seront créées avec le paramètre ENABLE_SCHEMA_EVOLUTION = FALSE. Ne s’applique pas aux tables Iceberg, car elles ne sont pas créées automatiquement. Ce paramètre affecte uniquement la création de tables, pas les tables existantes. Une des options suivantes : |
Oui |
Identificateur de compte Snowflake |
Lorsque vous utilisez :
|
Oui |
Stratégie d’authentification Snowflake |
Lorsque vous utilisez :
|
Oui |
Clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Fichier de clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Mot de passe de la clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Rôle Snowflake |
Lorsque vous utilisez :
|
Oui |
Nom d’utilisateur Snowflake |
Lorsque vous utilisez :
|
Oui |
Paramètres de la source JSON Kinesis¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
Code de la région AWS |
La région AWS où se trouve votre flux Kinesis, par exemple |
Oui |
ID de clé d’accès AWS |
L’ID de la clé d’accès AWS pour vous connecter à votre flux Kinesis DynamoDB et, en option, à CloudWatch. |
Oui |
Clé d’accès secrète AWS |
La clé d’accès secrète AWS pour se connecter à votre flux Kinesis DynamoDB et, en option, à CloudWatch. |
Oui |
Nom de l’application Kinesis |
Le nom qui est utilisé pour le nom de la table DynamoDB pour le suivi de la progression de l’application sur la consommation de flux Kinesis. |
Oui |
Type de consommateur Kinesis |
La stratégie utilisée pour lire des enregistrements à partir d’un flux Kinesis. Doit être l’une des valeurs suivantes : |
Oui |
Position initiale du flux Kinesis |
Position initiale du flux à partir de laquelle les données commencent la réplication. Les valeurs possibles sont les suivantes :
|
Oui |
Nom du flux Kinesis |
Nom du flux Kinesis AWS à partir duquel les données sont consommées. |
Oui |
Publication de métriques |
Spécifie où les métriques de la bibliothèque client Kinesis sont publiées. Valeurs possibles : |
Oui |
Exécutez le flux¶
Cliquez avec le bouton droit de la souris sur le Plan et sélectionnez Enable all Controller Services.
Cliquez à droite sur le groupe de processus du connecteur et sélectionnez Start.
Le connecteur démarre l’ingestion des données.
Schéma de table¶
La table Snowflake chargée par le connecteur contient des colonnes nommées par les clés de vos messages Kinesis. Le connecteur ajoute également une colonne KINESISMETADATA qui stocke les métadonnées sur l’enregistrement.
Vous trouverez ci-dessous un exemple de table Snowflake chargée par le connecteur :
Ligne |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
La colonne KINESISMETADATA contient un objet avec les champs suivants :
Nom du champ |
Type de champ |
Exemple de valeur |
Description |
|---|---|---|---|
|
Chaîne |
|
Nom du flux Kinesis d’où provient l’enregistrement. |
|
Chaîne |
|
L’identificateur du fragment dans le flux d’où provient l’enregistrement. |
|
Chaîne |
|
Heure approximative à laquelle l’enregistrement a été inséré dans le flux (format ISO 8601). |
|
Chaîne |
|
La clé de partition spécifiée par le producteur de données pour l’enregistrement. |
|
Chaîne |
|
Le numéro de séquence unique attribué par Kinesis Data Streams à l’enregistrement dans le fragment. |
|
Nombre |
|
Le numéro de sous-séquence de l’enregistrement (utilisé pour les enregistrements agrégés avec le même numéro de séquence). |
|
Chaîne |
|
Une combinaison du numéro de séquence et du numéro de sous-séquence de l’enregistrement. |
Évolution du schéma¶
Ce connecteur prend en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur.
Snowflake détecte le schéma des données entrantes et charge les données dans des tables qui correspondent à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.
La détection des schémas avec le connecteur déduit les types de données sur la base des données JSON fournies.
Si le connecteur crée la table cible, l’évolution du schéma est activée par défaut.
Si vous souhaitez activer ou désactiver l’évolution du schéma sur la table existante, utilisez la commande ALTER TABLE afin de définir le paramètre ENABLE_SCHEMA_EVOLUTION. Vous devez également utiliser un rôle qui a le privilège OWNERSHIP sur la table. Pour plus d’informations, voir Évolution du schéma de table.
Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur essaie d’envoyer les lignes dont les schémas ne correspondent pas au port de sortie en échec configuré.
Prise en charge des tables Iceberg¶
Openflow Connector for Kinesis peut ingérer des données dans une Table Apache Iceberg™ gérée par Snowflake lorsque Iceberg activé est défini sur true.
Exigences et limitations¶
Avant de configurer Openflow Connector for Kinesis pour l’ingestion de tables Iceberg, notez les exigences et limites suivantes :
Vous devez créer une table Iceberg avant d’exécuter le connecteur.
Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.
Configuration et définition¶
Pour configurer Openflow Connector for Kinesis pour l’ingestion de tables Iceberg, suivez les étapes de Configurer Openflow Connector for Kinesis pour le format de données JSON avec quelques différences notées dans les sections suivantes.
Activer l’ingestion dans la table Iceberg¶
Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled la valeur true.
Créer une table Iceberg pour l’ingestion¶
Avant d’exécuter le connecteur, vous devez créer une table Iceberg. Le schéma de table initial dépend des paramètres de propriété Évolution du schéma activée de votre connecteur.
Lorsque l’évolution du schéma est activée, vous devez créer une table avec une colonne nommée kinesisMetadata. Le connecteur crée automatiquement les colonnes des champs de message et modifie le schéma de colonnes kinesisMetadata.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
L’évolution du schéma est désactivée, vous devez créer la table avec tous les champs que le message Kinesis contient. Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg ou des types Snowflake compatibles. Le type semi-structuré VARIANT n’est pas pris en charge. Utilisez plutôt un OBJECT ou MAP structuré.
Prenons l’exemple du message suivant :
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
L’instruction suivante crée une table avec tous les champs que contient le message Kinesis :
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Note
kinesisMetadata doit toujours être créé. Les noms de champ à l’intérieur de structures imbriquées telles que dogs ou cats sont sensibles à la casse.