Paramétrez Openflow Connector for Kinesis¶
Note
Le connecteur est soumis aux conditions d’utilisation du connecteur.
Cette rubrique décrit les étapes pour paramétrer Openflow Connector for Kinesis.
Conditions préalables¶
Assurez-vous d’avoir consulté À propos de Openflow Connector for Kinesis.
Assurez-vous que vous avez paramétré Openflow.
Configurez un flux Kinesis¶
En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:
Assurez-vous que vous disposez d’un compte AWS avec des autorisations IAM d’accès à Kinesis Streams et DynamoDB.
Optionnellement, créez une file d’attente de lettres mortes (DLQ) Kinesis Stream. Les messages qui ne peuvent pas être analysés avec succès peuvent être redirigés vers un DLQ désigné.
Paramétrage du compte Snowflake¶
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.
Créez une base de données de destination et un schéma de destination qui serviront à créer les tables de destination pour le stockage des données.
Si vous planifiez d’utiliser la capacité du connecteur à créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges requis pour la création et la gestion d’objets Snowflake :
Objet
Privilège
Remarques
Base de données
USAGE
Schéma
USAGE . CREATE TABLE .
Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE
object
peuvent être révoqués.Table
OWNERSHIP
Uniquement requis lorsque vous utilisez le connecteur Kinesis pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du flux Kinesis, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.
Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role_1; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.
Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.
GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1; ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
Configurez avec l’authentification par paire de clés pour l’utilisateur de Snowflake SERVICE de l’étape 3.
Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.
Note
Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.
Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associée à Openflow, car de cette manière, aucun autre secret ne doit être conservé.
Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez vers Controller Settings » Parameter Provider et récupérez les valeurs de vos paramètres.
À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.
Si d’autres utilisateurs de Snowflake ont besoin d’accéder aux documents bruts ingérés et aux tables ingérées par le connecteur (par exemple, pour un traitement personnalisé dans Snowflake), accordez à ces utilisateurs le rôle créé à l’étape 1.
Désignez un entrepôt à utiliser par le connecteur. Commencez par la taille d’entrepôt la plus petite, puis faites des essais en fonction du nombre de tables répliquées et de la quantité de données transférées. Les tables de grande taille s’adaptent généralement mieux aux entrepôts multi-clusters, plutôt qu’aux entrepôts de grande taille.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.
Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.
Sélectionnez Add.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Renseignez les valeurs des paramètres requis comme décrit dans Paramètres de débit.
Paramètres de débit¶
Cette section décrit les paramètres de flux que vous pouvez configurer en fonction des contextes de paramètres suivants :
Paramètres de la source Kinesis: pour établir la connexion avec Kinesis.
Paramètres de destination Kinesis: pour établir la connexion avec Snowflake.
Paramètres d’ingestion Kinesis: pour définir la configuration des données téléchargées depuis Kinesis.
Paramètres de la source Kinesis¶
Paramètre |
Description |
---|---|
Code de la région AWS |
La région AWS où se trouve votre flux Kinesis, par exemple |
ID de clé d’accès AWS |
L’ID de clé d’accès AWS pour vous connecter à votre Kinesis Stream et DynamoDB. |
Clé d’accès secrète AWS |
La clé d’accès secrète AWS pour se connecter à votre flux Kinesis et DynamoDB. |
URL du registre des schémas |
L’URL du registre des schémas AVRO. C’est une exigence si le paramètre AVRO Schema Access Strategy est défini sur |
Type d’authentification du registre du schéma |
Le type d’authentification utilisé par le registre des schémas AVRO. C’est une exigence si le paramètre AVRO Schema Access Strategy est défini sur
|
Nom d’utilisateur du registre des schémas |
Le nom d’utilisateur utilisé pour l’authentification de |
Mot de passe du registre des schémas |
Le mot de passe utilisé pour l’authentification |
Paramètres de la destination Kinesis¶
Paramètre |
Description |
---|---|
Base de données de destination |
La base de données dans laquelle les données seront conservées. Doit déjà exister dans Snowflake. |
Schéma de destination |
Le schéma dans lequel les données seront conservées. Doit déjà exister dans Snowflake. Ce paramètre est sensible à la casse. |
Identificateur de compte Snowflake |
Nom du compte Snowflake formaté comme suit : [nom de l’organisation]-[nom du compte] où les données seront conservées. |
Stratégie d’authentification Snowflake |
Stratégie d’authentification auprès de Snowflake. Valeurs possibles : |
Clé privée de Snowflake |
La clé privée RSA utilisée pour l’authentification. La clé RSA doit être formatée selon les normes PKCS8 et comporter les en-têtes et pieds de page standard PEM. Notez que vous devez définir soit le fichier de clé privée de Snowflake, soit la clé privée de Snowflake. |
Fichier de clé privée de Snowflake |
Le fichier qui contient la clé privée RSA utilisée pour l’authentification à Snowflake, formaté selon les normes PKCS8 et comportant les en-têtes et pieds de page standard PEM. La ligne d’en-tête commence par |
Mot de passe de la clé privée de Snowflake |
Le mot de passe associé au fichier de clé privée de Snowflake. |
Rôle Snowflake |
Rôle de Snowflake utilisé lors de l’exécution de la requête. |
Nom d’utilisateur Snowflake |
Nom d’utilisateur utilisé pour se connecter à l’instance de Snowflake. |
Entrepôt Snowflake |
L’entrepôt de Snowflake utilisé pour exécuter des requêtes. Ce paramètre est sensible à la casse. |
Paramètres d’ingestion de Kinesis¶
Paramètre |
Description |
---|---|
Nom de l’application Kinesis |
Le nom qui est utilisé pour le nom de la table DynamoDB pour le suivi de la progression de l’application sur la consommation de flux Kinesis. |
Nom du flux Kinesis |
Nom du flux Kinesis AWS à partir duquel les données sont consommées. |
Position initiale du flux Kinesis |
Position initiale du flux à partir de laquelle les données commencent la réplication.
|
Nom du flux DLQ Kinesis |
Nom du flux où sont envoyés tous les enregistrements dont le traitement a échoué. Si ce paramètre n’est pas ajouté, vous pouvez vous attendre à un signe d’avertissement dans la partie du connecteur liée à DLQsur le canevas Openflow. |
Format du message |
Le format des messages dans Kinesis.
|
Stratégie d’accès au schéma AVRO |
Pour accéder aux données dans le format de message AVRO, le schéma est nécessaire. Ce paramètre définit la stratégie d’accès au schéma AVRO d’un message particulier. Si le paramètre Format du message a pour valeur
|
Carte Kinesis Stream To Table |
Ce paramètre facultatif permet à l’utilisateur de spécifier quels flux doivent être mappés à quelles tables. Chaque flux et le nom de sa table doivent être séparés par deux points. Ce nom de table doit être un identificateur non spécifié Snowflake et valide. Les expressions régulières ne peuvent pas être ambiguës et tout flux correspondant ne doit correspondre qu’à une seule table cible. Si le nom du flux est vide ou si aucune correspondance n’est trouvée, le nom du flux est utilisé comme nom de table.
|
Iceberg activé |
Indique si le processeur intègre des données dans une table Iceberg. Le processeur échoue si cette propriété ne correspond pas au type de table réel.
|
Exécutez le flux¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start.
Le connecteur démarre l’ingestion des données.
Schéma¶
La table Snowflake chargée par le connecteur contient des colonnes nommées par les clés de vos messages Kinesis. Vous trouverez ci-dessous un exemple d’une telle table.
Ligne |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
5 |
ABC123 |
ZTEST |
BUY |
1558 |
Évolution du schéma¶
Actuellement, lorsque Iceberg Enabled
est paramétré sur false
. Si le connecteur crée la table cible, l’évolution du schéma est activée par défaut. Si vous souhaitez activer ou désactiver l’évolution du schéma sur la table existante, utilisez la commande ALTER TABLE afin d’ensemble le paramètre ENABLE_SCHEMA_EVOLUTION
. Vous devez également utiliser un rôle qui a le privilège OWNERSHIP
sur la table. Pour plus d’informations, voir Évolution du schéma de table.
Toutefois, si l’évolution des schémas est désactivée pour une table existante, le connecteur essaiera d’envoyer les lignes dont les schémas ne correspondent pas aux files d’attente de lettres mortes configurées (DLQ).
Dans le cas où Iceberg Enabled
est paramétré sur true
, voir le paragraphe Évolution du schéma des tables Apache Iceberg™.
Utilisation de Openflow Connector for Kinesis avec les tables Apache Iceberg™¶
Openflow Connector for Kinesis peut ingérer des données dans une table Apache Iceberg™ gérée par Snowflake.
Exigences et limitations¶
Avant de configurer le connecteur pour l’ingestion de tables Iceberg, notez les exigences et limites suivantes :
Vous devez créer une table Iceberg avant d’exécuter le connecteur.
Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.
L’évolution du schéma n’est pas prise en charge pour les tables Iceberg.
Configuration et définition¶
Pour configurer le connecteur pour l’ingestion de tables Iceberg, suivez les instructions de configuration du connecteur avec quelques différences qui sont décrites dans les sections suivantes.
Activer l’ingestion dans la table Iceberg¶
Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled
la valeur true
.
Créer une table Iceberg pour l’ingestion¶
Avant d’exécuter le connecteur, vous devez créer une table Iceberg. L’évolution du schéma n’étant pas prise en charge, vous devez créer la table avec tous les champs que le message Kinesis contient.
Lorsque vous créez une table Iceberg, vous pouvez utiliser des types de données Iceberg ou des types Snowflake compatibles. Le type semi-structuré VARIANT n’est pas pris en charge. Utilisez plutôt un OBJECT ou MAP structuré.
Prenons l’exemple du message suivant :
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
Pour créer une table Iceberg pour le message d’exemple, utilisez l’instruction suivante :
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Note
Les noms de champ à l’intérieur de structures imbriquées telles que dogs
ou cats
sont sensibles à la casse.
Évolution du schéma des tables Apache Iceberg™¶
Actuellement, le connecteur ne permet pas de faire évoluer le schéma des tables Apache Iceberg™.
Problèmes connus¶
Le groupe de processus du connecteur possède un port de sortie appelé « Échec du téléchargement ». Il peut être utilisé pour gérer FlowFiles qui n’a pas été téléchargé avec succès vers Snowflake. Si ce port n’est pas connecté en dehors du groupe de processus du connecteur, il affichera un signe d’avertissement qui peut être ignoré.
Tous les processeurs, lorsqu’ils sont à l’arrêt, peuvent être remis en marche une fois. En raison de son architecture interne, le processeur ConsumeKinesisStream n’effectuera aucun travail significatif lorsqu’on lui demandera de fonctionner une seule fois. Pour que le processeur commence à fonctionner, il doit être mis en marche et fonctionner pendant environ deux minutes.