Set up Openflow Connector for Kinesis for JSON data format

Note

This connector is subject to the Snowflake Connector Terms.

This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.

The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.

Conditions préalables

  1. Assurez-vous d’avoir consulté À propos de Openflow Connector for Kinesis.

  2. Ensure that you have Configuration d’Openflow - BYOC or Set up Openflow - Snowflake Deployments.

  3. Si vous utilisez Openflow - Snowflake Deployments, assurez-vous d’avoir examiné la configuration des domaines requis et d’avoir accordé l’accès aux domaines requis pour le connecteur Kinesis.

Note

If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.

Configurez un flux Kinesis

En tant qu’administrateur AWS, effectuez les actions suivantes dans votre compte AWS:

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Paramétrage du compte Snowflake

En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

  1. Créez un nouveau rôle ou utilisez un rôle existant et accordez le Privilèges de base de données.

  2. Créez une base de données de destination et un schéma de destination qui serviront à créer les tables de destination pour le stockage des données.

    1. Si vous planifiez d’utiliser la capacité du connecteur à créer automatiquement la table de destination si elle n’existe pas encore, assurez-vous que l’utilisateur dispose des privilèges requis pour la création et la gestion d’objets Snowflake :

      Objet

      Privilège

      Remarques

      Base de données

      USAGE

      Schéma

      USAGE . CREATE TABLE .

      Une fois les objets au niveau du schéma ont été créés, les privilèges CREATE object peuvent être révoqués.

      Table

      OWNERSHIP

      Uniquement requis lorsque vous utilisez le connecteur Kinesis pour ingérer des données dans une table existante. . Si le connecteur crée une nouvelle table cible pour les enregistrements du flux Kinesis, le rôle par défaut de l’utilisateur spécifié dans la configuration devient le propriétaire de la table.

      Vous pouvez utiliser le script suivant pour créer et configurer un rôle personnalisé (exigence : SECURITYADMIN ou équivalent) :

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. Créez un nouvel utilisateur du service Snowflake avec le type SERVICE.

  4. Autorisez l’utilisateur du service Snowflake le rôle que vous avez créé dans les étapes précédentes.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. Configurez avec l’authentification par paire de clés pour l’utilisateur de Snowflake SERVICE de l’étape 3.

  6. Snowflake recommande vivement cette étape. Configurez un gestionnaire de secrets pris en charge par Openflow, par exemple AWS, Azure et Hashicorp, et stockez les clés publiques et privées dans le magasin de secrets.

    Note

    Si, pour une raison quelconque, vous ne souhaitez pas utiliser un gestionnaire de secrets, il vous incombe de protéger les fichiers de clés publiques et privées utilisés pour l’authentification par paires de clés conformément aux politiques de sécurité de votre organisation.

    1. Une fois le gestionnaire de secrets configuré, déterminez comment vous vous y authentifierez. Sur AWS, il est recommandé d’utiliser le rôle de l’instance EC2 associée à Openflow, car de cette manière, aucun autre secret ne doit être conservé.

    2. Dans Openflow, configurez un fournisseur de paramètres associé à ce gestionnaire de secrets, à partir du menu hamburger en haut à droite. Naviguez jusqu’à Controller Settings » Parameter Provider, puis récupérez vos valeurs de paramètres.

    3. À ce stade, tous les identifiants peuvent être référencés avec les chemins de paramètres associés et aucune valeur sensible ne doit être conservée dans Openflow.

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.

Définir le connecteur

En tant qu’ingénieur des données, effectuez les tâches suivantes pour installer et configurer le connecteur :

Installer le connecteur

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  4. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  5. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

  1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

  2. Populate the required parameter values as described in Parameters section below.

Parameters

This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.

The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module’s parameter context.

Snowflake destination parameters

Paramètre

Description

Obligatoire

Base de données de destination

La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Schéma de destination

Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Voir l’exemple suivant :

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name : utilisez SCHEMA_NAME.

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" : utilisez schema_name ou SCHEMA_NAME, respectivement.

Oui

Iceberg activé

Whether Iceberg is enabled for table operations. One of true / false.

Oui

Schema Evolution Enabled

Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables. Note that schema evolution can also be controlled at the individual table level through table-specific parameters. One of: true / false.

Oui

Schema Evolution For New Tables Enabled

Controls whether schema evolution is enabled when creating new tables. When set to “true”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter. When set to “false”, new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter. Not applicable to Iceberg tables as they are not being created automatically. This setting only affects table creation, not existing tables. One of: true / false.

Oui

Identificateur de compte Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : nom du compte Snowflake au format [nom-organisation]-[nom-compte] où les données seront conservées.

Oui

Stratégie d’authentification Snowflake

Lorsque vous utilisez :

  • Déploiement Snowflake Openflow ou BYOC : Utilisez SNOWFLAKE_SESSION_TOKEN. Ce jeton est géré automatiquement par Snowflake. Les déploiements BYOC doivent disposer de rôles d’exécution configurés au préalable pour utiliser SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Oui

Clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : Doit correspondre à la clé privée RSA utilisée pour l’authentification.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Non

Fichier de clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : Le fichier de la clé privée doit être vide.

  • KEY_PAIR : Chargez le fichier qui contient la clé privée RSA utilisée pour l’authentification auprès de Snowflake, formatée conformément aux normes PKCS8 et possédant des en-têtes et des pieds de page PEM standards. La ligne d’en-tête commence par -----BEGIN PRIVATE. Pour charger le fichier de la clé privée, cochez la case Reference asset.

Non

Mot de passe de la clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : fournissez le mot de passe associé au fichier de la clé privée Snowflake.

Non

Rôle Snowflake

Lorsque vous utilisez :

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • Stratégie d’authentification KEY_PAIR : Utilisez un rôle valide configuré pour votre utilisateur de service.

Oui

Nom d’utilisateur Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : indiquez le nom d’utilisateur utilisé pour vous connecter à l’instance Snowflake.

Oui

Kinesis JSON Source Parameters

Paramètre

Description

Obligatoire

Code de la région AWS

La région AWS où se trouve votre flux Kinesis, par exemple us-west-2.

Oui

ID de clé d’accès AWS

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Oui

Clé d’accès secrète AWS

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Oui

Nom de l’application Kinesis

Le nom qui est utilisé pour le nom de la table DynamoDB pour le suivi de la progression de l’application sur la consommation de flux Kinesis.

Oui

Position initiale du flux Kinesis

Position initiale du flux à partir de laquelle les données commencent la réplication.

Les valeurs possibles sont les suivantes :
  • LATEST: Dernier enregistrement stocké

  • TRIM_HORIZON: Enregistrement le plus ancien stocké

Oui

Nom du flux Kinesis

Nom du flux Kinesis AWS à partir duquel les données sont consommées.

Oui

Metrics Publishing

Specifies where Kinesis Client Library metrics are published to. Possible values: DISABLED, LOGS, CLOUDWATCH.

Oui

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Right-click on the connector’s process group and select Start.

Le connecteur démarre l’ingestion des données.

Table Schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Below is an example of a Snowflake table loaded by the connector:

Ligne

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … KINESISMETADATA object … }

2

XYZ789

ZABZX

SELL

3024

{ … KINESISMETADATA object … }

3

XYZ789

ZTEST

SELL

799

{ … KINESISMETADATA object … }

4

ABC123

ZABZX

BUY

2033

{ … KINESISMETADATA object … }

The KINESISMETADATA column contains an object with the following fields:

Field Name

Field Type

Example Value

Description

stream

String

stream-name

The name of the Kinesis stream the record came from.

shardId

String

shardId-000000000001

The identifier of the shard in the stream the record came from.

approximateArrival

String

2025-11-05T09:12:15.300

The approximate time that the record was inserted into the stream (ISO 8601 format).

partitionKey

String

key-1234

The partition key specified by the data producer for the record.

sequenceNumber

String

123456789

The unique sequence number assigned by Kinesis Data Streams to the record in the shard.

subSequenceNumber

Number

2

The subsequence number for the record (used for aggregated records with the same sequence number).

shardedSequenceNumber

String

12345678900002

A combination of the sequence number and the subsequence number for the record.

Évolution du schéma

This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.

Snowflake detects the schema of the incoming data and loads data into tables that match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL constraint from columns missing in new incoming records.

Schema detection with the connector infers data types based on the JSON data provided.

If the connector creates the target table, schema evolution is enabled by default.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Évolution du schéma de table.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

Exigences et limitations

Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • Vous devez créer une table Iceberg avant d’exécuter le connecteur.

  • Assurez-vous que l’utilisateur a accès à l’insertion de données dans les tables créées.

Configuration et définition

To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Activer l’ingestion dans la table Iceberg

Pour permettre l’ingestion dans une table Iceberg, vous devez donner au paramètre Iceberg Enabled la valeur true.

Créer une table Iceberg pour l’ingestion

Before you run the connector, you must create an Iceberg table. The initial table schema depends on your connector Schema Evolution Enabled property settings.

With enabled schema evolution, you must create a table with a column named kinesisMetadata. The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

Prenons l’exemple du message suivant :

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

The following statement creates a table with all fields the Kinesis message contains:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

Note

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.