Set up Openflow Connector for Kinesis for JSON data format¶
Note
This connector is subject to the Snowflake Connector Terms.
This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.
The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.
Conditions préalables¶
Assurez-vous d’avoir consulté À propos de Openflow Connector for Kinesis.
Ensure that you have Configuration d’Openflow - BYOC or Set up Openflow - Snowflake Deployments.
Si vous utilisez Openflow - Snowflake Deployments, assurez-vous d’avoir examiné la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.
Note
If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.
Configurez un flux Kinesis¶
En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Paramétrage du compte Snowflake¶
En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :
Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.
Créez une base de données de destination et un schéma de destination qui serviront à créer les tables de destination pour le stockage des données.
Si vous planifiez d’utiliser la capacité du connecteur à créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges requis pour la création et la gestion d’objets Snowflake :
Objet
Privilège
Remarques
Base de données
USAGE
Schéma
USAGE . CREATE TABLE .
Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE
objectpeuvent être révoqués.Table
OWNERSHIP
Uniquement requis lorsque vous utilisez le connecteur Kinesis pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du flux Kinesis, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.
Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.
Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Configurez avec l’authentification par paire de clés pour l’utilisateur de Snowflake SERVICE de l’étape 3.
Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.
Note
Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.
Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associée à Openflow, car de cette manière, aucun autre secret ne doit être conservé.
Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.
À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
Définir le connecteur¶
En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :
Installer le connecteur¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Note
Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.
Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.
Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.
Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.
Configuration du connecteur¶
Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.
Populate the required parameter values as described in Parameters section below.
Parameters¶
This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.
The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module’s parameter context.
Snowflake destination parameters¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
Base de données de destination |
La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. |
Oui |
Schéma de destination |
Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules. Voir l’exemple suivant :
|
Oui |
Iceberg activé |
Whether Iceberg is enabled for table operations. One of |
Oui |
Schema Evolution Enabled |
Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables.
Note that schema evolution can also be controlled at the individual table level through table-specific parameters.
One of: |
Oui |
Schema Evolution For New Tables Enabled |
Controls whether schema evolution is enabled when creating new tables. When set to “true”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter.
When set to “false”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter.
Not applicable to Iceberg tables as they are not being created automatically.
This setting only affects table creation, not existing tables. One of: |
Oui |
Identificateur de compte Snowflake |
Lorsque vous utilisez :
|
Oui |
Stratégie d’authentification Snowflake |
Lorsque vous utilisez :
|
Oui |
Clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Fichier de clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Mot de passe de la clé privée de Snowflake |
Lorsque vous utilisez :
|
Non |
Rôle Snowflake |
Lorsque vous utilisez :
|
Oui |
Nom d’utilisateur Snowflake |
Lorsque vous utilisez :
|
Oui |
Kinesis JSON Source Parameters¶
Paramètre |
Description |
Obligatoire |
|---|---|---|
Code de la région AWS |
La région AWS où se trouve votre flux Kinesis, par exemple |
Oui |
ID de clé d’accès AWS |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Oui |
Clé d’accès secrète AWS |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Oui |
Nom de l’application Kinesis |
Le nom qui est utilisé pour le nom de la table DynamoDB pour le suivi de la progression de l’application sur la consommation de flux Kinesis. |
Oui |
Position initiale du flux Kinesis |
Position initiale du flux à partir de laquelle les données commencent la réplication.
|
Oui |
Nom du flux Kinesis |
Nom du flux Kinesis AWS à partir duquel les données sont consommées. |
Oui |
Metrics Publishing |
Specifies where Kinesis Client Library metrics are published to. Possible values: |
Oui |
Exécutez le flux¶
Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.
Right-click on the connector’s process group and select Start.
Le connecteur démarre l’ingestion des données.
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Below is an example of a Snowflake table loaded by the connector:
Ligne |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
The KINESISMETADATA column contains an object with the following fields:
Field Name |
Field Type |
Example Value |
Description |
|---|---|---|---|
|
String |
|
The name of the Kinesis stream the record came from. |
|
String |
|
The identifier of the shard in the stream the record came from. |
|
String |
|
The approximate time that the record was inserted into the stream (ISO 8601 format). |
|
String |
|
The partition key specified by the data producer for the record. |
|
String |
|
The unique sequence number assigned by Kinesis Data Streams to the record in the shard. |
|
Number |
|
The subsequence number for the record (used for aggregated records with the same sequence number). |
|
String |
|
A combination of the sequence number and the subsequence number for the record. |
Évolution du schéma¶
This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.
Snowflake detects the schema of the incoming data and loads data into tables
that match any user-defined schema. Snowflake also allows adding
new columns or dropping the NOT NULL constraint from columns missing in new incoming records.
Schema detection with the connector infers data types based on the JSON data provided.
If the connector creates the target table, schema evolution is enabled by default.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Évolution du schéma de table.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
Exigences et limitations¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
Vous devez créer une table Iceberg avant d’exécuter le connecteur.
Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.
Configuration et définition¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Activer l’ingestion dans la table Iceberg¶
Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled la valeur true.
Créer une table Iceberg pour l’ingestion¶
Before you run the connector, you must create an Iceberg table.
The initial table schema depends on your connector Schema Evolution Enabled property settings.
With enabled schema evolution, you must create a table with a column named kinesisMetadata.
The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
Prenons l’exemple du message suivant :
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
The following statement creates a table with all fields the Kinesis message contains:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Note
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.