Set up Openflow Connector for Kinesis for JSON data format¶
Note
This connector is subject to the Snowflake Connector Terms.
Ce chapitre décrit les étapes de configuration de Openflow Connector for Kinesis pour le format de données JSON. Il s’agit d’un connecteur simplifié, optimisé pour l’ingestion de messages de base avec des capacités d’évolution des schémas.
Le Openflow Connector for Kinesis pour le format de données JSON est conçu pour l’ingestion de messages JSON simples des flux Kinesis vers des tables Snowflake.
Conditions préalables¶
Assurez-vous d’avoir consulté À propos de Openflow Connector for Kinesis.
Ensure that you have Configuration d’Openflow - BYOC or Set up Openflow - Snowflake Deployments.
Si vous utilisez Openflow - Snowflake Deployments, assurez-vous d’avoir examiné la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.
Note
Si vous avez besoin de la prise en charge d’autres formats de données ou de fonctionnalités, comme DLQ, contactez votre représentant Snowflake.
Configurez un flux Kinesis¶
En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Paramétrage du compte Snowflake¶
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.
Créez une base de données de destination et un schéma de destination qui serviront à créer les tables de destination pour le stockage des données.
Si vous planifiez d’utiliser la capacité du connecteur à créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges requis pour la création et la gestion d’objets Snowflake :
Objet
Privilège
Remarques
Base de données
USAGE
Schéma
USAGE . CREATE TABLE .
Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE
objectpeuvent être révoqués.Table
OWNERSHIP
Uniquement requis lorsque vous utilisez le connecteur Kinesis pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du flux Kinesis, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.
Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.
Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Configurez avec l’authentification par paire de clés pour l’utilisateur de Snowflake SERVICE de l’étape 3.
Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.
Note
Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.
Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associée à Openflow, car de cette manière, aucun autre secret ne doit être conservé.
Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.
À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Populate the required parameter values as described in Parameters section below.
Parameters¶
Cette section décrit tous les paramètres de Openflow Connector for Kinesis pour le format de données JSON.
Le connecteur se compose de plusieurs modules. Pour voir l’ensemble, double-cliquez sur le groupe de processus du connecteur. Vous pourrez définir les paramètres de chaque module dans le contexte de paramètres du module.
Snowflake destination parameters¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
Base de données de destination |
La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Schéma de destination |
Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. Voir l’exemple suivant :
|
Oui |
Iceberg activé |
Si Iceberg est activé pour les opérations de table. Une des valeurs |
Oui |
Schema Evolution Enabled |
Active ou désactive l’évolution du schéma au niveau du connecteur. Lorsque l’option est activée, autorise les modifications automatiques du schéma pour les tables. Notez que l’évolution des schémas peut également être contrôlée au niveau des tables individuelles par le biais de paramètres spécifiques aux tables. Une des options suivantes : |
Oui |
Schema Evolution For New Tables Enabled |
Contrôle si l’évolution du schéma est activée lors de la création de nouvelles tables. Si la valeur est définie sur “true”, de nouvelles tables seront créées avec le paramètre ENABLE_SCHEMA_EVOLUTION = TRUE. Si défini sur “false”, de nouvelles tables seront créées avec le paramètre ENABLE_SCHEMA_EVOLUTION = FALSE. Ne s’applique pas aux tables Iceberg, car elles ne sont pas créées automatiquement. Ce paramètre affecte uniquement la création de tables, pas les tables existantes. Une des options suivantes : |
Oui |
Identificateur de compte Snowflake |
Lorsque vous utilisez :
|
Oui |
Stratégie d’authentification Snowflake |
Lorsque vous utilisez :
|
Oui |
Clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Fichier de clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Mot de passe de la clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Rôle Snowflake |
Lorsque vous utilisez :
|
Oui |
Nom d’utilisateur Snowflake |
Lorsque vous utilisez :
|
Oui |
Kinesis JSON Source Parameters¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
Code de la région AWS |
La région AWS où se trouve votre flux Kinesis, par exemple |
Oui |
ID de clé d’accès AWS |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Oui |
Clé d’accès secrète AWS |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Oui |
Nom de l’application Kinesis |
Le nom qui est utilisé pour le nom de la table DynamoDB pour le suivi de la progression de l’application sur la consommation de flux Kinesis. |
Oui |
Position initiale du flux Kinesis |
Position initiale du flux à partir de laquelle les données commencent la réplication.
|
Oui |
Nom du flux Kinesis |
Nom du flux Kinesis AWS à partir duquel les données sont consommées. |
Oui |
Publication de métriques |
Spécifie où les métriques de la bibliothèque client Kinesis sont publiées. Valeurs possibles : |
Oui |
Exécutez le flux¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Right-click on the connector’s process group and select Start.
Le connecteur démarre l’ingestion des données.
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Vous trouverez ci-dessous un exemple de table Snowflake chargée par le connecteur :
Ligne |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
La colonne KINESISMETADATA contient un objet avec les champs suivants :
Nom du champ |
Type de champ |
Example Value |
Description |
|---|---|---|---|
|
Chaîne |
|
Nom du flux Kinesis d’où provient l’enregistrement. |
|
Chaîne |
|
L’identificateur du fragment dans le flux d’où provient l’enregistrement. |
|
Chaîne |
|
Heure approximative à laquelle l’enregistrement a été inséré dans le flux (format ISO 8601). |
|
Chaîne |
|
La clé de partition spécifiée par le producteur de données pour l’enregistrement. |
|
Chaîne |
|
Le numéro de séquence unique attribué par Kinesis Data Streams à l’enregistrement dans le fragment. |
|
Nombre |
|
Le numéro de sous-séquence de l’enregistrement (utilisé pour les enregistrements agrégés avec le même numéro de séquence). |
|
Chaîne |
|
Une combinaison du numéro de séquence et du numéro de sous-séquence de l’enregistrement. |
Évolution du schéma¶
Ce connecteur prend en charge la détection et l’évolution automatiques des schémas. La structure des tables dans Snowflake est définie et évolue automatiquement pour prendre en charge la structure des nouvelles données chargées par le connecteur.
Snowflake détecte le schéma des données entrantes et charge les données dans des tables qui correspondent à n’importe quel schéma défini par l’utilisateur. Snowflake permet également d’ajouter de nouvelles colonnes ou de supprimer la contrainte NOT NULL des colonnes manquantes dans les nouveaux fichiers de données.
La détection des schémas avec le connecteur déduit les types de données sur la base des données JSON fournies.
Si le connecteur crée la table cible, l’évolution du schéma est activée par défaut.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Évolution du schéma de table.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
Exigences et limitations¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
Vous devez créer une table Iceberg avant d’exécuter le connecteur.
Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.
Configuration et définition¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Activer l’ingestion dans la table Iceberg¶
Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled la valeur true.
Créer une table Iceberg pour l’ingestion¶
Avant d’exécuter le connecteur, vous devez créer une table Iceberg. Le schéma de table initial dépend des paramètres de propriété Évolution du schéma activée de votre connecteur.
Lorsque l’évolution du schéma est activée, vous devez créer une table avec une colonne nommée kinesisMetadata. Le connecteur crée automatiquement les colonnes des champs de message et modifie le schéma de colonnes kinesisMetadata.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
Prenons l’exemple du message suivant :
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
L’instruction suivante crée une table avec tous les champs que contient le message Kinesis :
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Note
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.