Set up Openflow Connector for Kinesis for JSON data format¶
Note
This connector is subject to the Snowflake Connector Terms.
This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.
The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.
Prerequisites¶
Ensure that you have reviewed About Openflow Connector for Kinesis.
Ensure that you have Set up Openflow - BYOC or Set up Openflow - Snowflake Deployments.
If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the Kinesis connector.
Note
If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.
Set up a Kinesis stream¶
As an AWS administrator, perform the following actions in your AWS account:
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new role or use an existing role and grant the Database privileges.
Create a destination database and destination schema that will be used to create destination tables for storing the data.
If you plan to use the connector’s capability to automatically create destination table if it does not already exist, make sure the user has the required privileges for creating and managing Snowflake objects:
Object
Privilege
Notes
Database
USAGE
Schema
USAGE . CREATE TABLE .
After the schema-level objects have been created, the CREATE
objectprivileges can be revoked.Table
OWNERSHIP
Only required when using the Kinesis connector to ingest data into an existing table. . If the connector creates a new target table for records from the Kinesis stream, the default role for the user specified in the configuration becomes the table owner.
You can use the following script to create and configure a custom role (requires SECURITYADMIN or equivalent):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
Create a new Snowflake service user with the type as SERVICE.
Grant the Snowflake service user the role you created in the previous steps.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
Configure with key-pair auth for the Snowflake SERVICE user from step 3.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
Set up the connector¶
As a data engineer, perform the following tasks to install and configure the connector:
Install the connector¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
On the Openflow connectors page, find the connector and select Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
Note
Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Configure the connector¶
Right-click on the imported process group and select Parameters.
Populate the required parameter values as described in Parameters section below.
Parameters¶
This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.
The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module’s parameter context.
Snowflake destination parameters¶
Parameter |
Description |
Required |
|---|---|---|
Destination Database |
The database where data will be persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. |
Yes |
Destination Schema |
The schema where data will be persisted, which must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase. See the following examples:
|
Yes |
Iceberg Enabled |
Whether Iceberg is enabled for table operations. One of |
Yes |
Schema Evolution Enabled |
Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables.
Note that schema evolution can also be controlled at the individual table level through table-specific parameters.
One of: |
Yes |
Schema Evolution For New Tables Enabled |
Controls whether schema evolution is enabled when creating new tables. When set to ‘true’, new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter.
When set to ‘false’, new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter.
Not applicable to Iceberg tables as they are not being created automatically.
This setting only affects table creation, not existing tables. One of: |
Yes |
Snowflake Account Identifier |
When using:
|
Yes |
Snowflake Authentication Strategy |
When using:
|
Yes |
Snowflake Private Key |
When using:
|
No |
Snowflake Private Key File |
When using:
|
No |
Snowflake Private Key Password |
When using
|
No |
Snowflake Role |
When using
|
Yes |
Snowflake Username |
When using
|
Yes |
Kinesis JSON Source Parameters¶
Parameter |
Description |
Required |
|---|---|---|
AWS Region Code |
The AWS region where your Kinesis Stream is located, for example |
Yes |
AWS Access Key ID |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Yes |
AWS Secret Access Key |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
Yes |
Kinesis Application Name |
The name that is used for DynamoDB table name for tracking application’s progress on Kinesis Stream consumption. |
Yes |
Kinesis Initial Stream Position |
The initial stream position from which the data starts replication.
|
Yes |
Kinesis Stream Name |
AWS Kinesis Stream Name to consume data from. |
Yes |
Metrics Publishing |
Specifies where Kinesis Client Library metrics are published to. Possible values: |
Yes |
Run the flow¶
Right-click on the plane and select Enable all Controller Services.
Right-click on the connector’s process group and select Start.
The connector starts the data ingestion.
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
Below is an example of a Snowflake table loaded by the connector:
Row |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
The KINESISMETADATA column contains an object with the following fields:
Field Name |
Field Type |
Example Value |
Description |
|---|---|---|---|
|
String |
|
The name of the Kinesis stream the record came from. |
|
String |
|
The identifier of the shard in the stream the record came from. |
|
String |
|
The approximate time that the record was inserted into the stream (ISO 8601 format). |
|
String |
|
The partition key specified by the data producer for the record. |
|
String |
|
The unique sequence number assigned by Kinesis Data Streams to the record in the shard. |
|
Number |
|
The subsequence number for the record (used for aggregated records with the same sequence number). |
|
String |
|
A combination of the sequence number and the subsequence number for the record. |
Schema evolution¶
This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.
Snowflake detects the schema of the incoming data and loads data into tables
that match any user-defined schema. Snowflake also allows adding
new columns or dropping the NOT NULL constraint from columns missing in new incoming records.
Schema detection with the connector infers data types based on the JSON data provided.
If the connector creates the target table, schema evolution is enabled by default.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see Table schema evolution.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
Requirements and limitations¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
You must create an Iceberg table before running the connector.
Make sure that the user has access to inserting data into the created tables.
Configuration and setup¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Enable ingestion into Iceberg table¶
To enable ingestion into an Iceberg table, you must set the Iceberg Enabled parameter to true.
Create an Iceberg table for ingestion¶
Before you run the connector, you must create an Iceberg table.
The initial table schema depends on your connector Schema Evolution Enabled property settings.
With enabled schema evolution, you must create a table with a column named kinesisMetadata.
The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
For example, consider the following message:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
The following statement creates a table with all fields the Kinesis message contains:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
Note
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.