Set up the Openflow Connector for Kinesis¶
Note
The connector is subject to the Connector Terms.
This topic describes the steps to set up the Openflow Connector for Kinesis.
Prerequisites¶
Ensure that you have reviewed About Openflow Connector for Kinesis.
Ensure that you have set up Openflow.
Set up a Kinesis stream¶
As an AWS administrator, perform the following actions in your AWS account:
Ensure that you have an AWS Account with IAM permissions to access Kinesis Streams and DynamoDB.
Optionally, create a dead-letter queue (DLQ) Kinesis Stream. Messages that cannot be successfully parsed can be redirected to a designated DLQ.
Set up Snowflake account¶
As a Snowflake account administrator, perform the following tasks:
Create a new role or use an existing role and grant the Database privileges.
Create a destination database and destination schema that will be used to create destination tables for storing the data.
If you plan to use the connector’s capability to automatically create destination table if it does not already exist, make sure the user has the required privileges for creating and managing Snowflake objects:
Object
Privilege
Notes
Database
USAGE
Schema
USAGE . CREATE TABLE .
After the schema-level objects have been created, the CREATE
object
privileges can be revoked.Table
OWNERSHIP
Only required when using the Kinesis connector to ingest data into an existing table. . If the connector creates a new target table for records from the Kinesis stream, the default role for the user specified in the configuration becomes the table owner.
You can use the following script to create and configure a custom role (requires SECURITYADMIN or equivalent):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role_1; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
Create a new Snowflake service user with the type as SERVICE.
Grant the Snowflake service user the role you created in the previous steps.
GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1; ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
Configure with key-pair auth for the Snowflake SERVICE user from step 3.
Snowflake strongly recommends this step. Configure a secrets manager supported by Openflow, for example, AWS, Azure, and Hashicorp, and store the public and private keys in the secret store.
Note
If for any reason, you do not wish to use a secrets manager, then you are responsible for safeguarding the public key and private key files used for key-pair authentication according to the security policies of your organization.
Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.
In Openflow, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.
At this point all credentials can be referenced with the associated parameter paths and no sensitive values need to be persisted within Openflow.
If any other Snowflake users require access to the raw ingested documents and tables ingested by the connector (for example, for custom processing in Snowflake), then grant those users the role created in step 1.
Designate a warehouse for the connector to use. Start with the smallest warehouse size, then experiment with size depending on the number of tables being replicated, and the amount of data transferred. Large table numbers typically scale better with multi-cluster warehouses, rather than larger warehouse sizes.
Set up the connector¶
As a data engineer, perform the following tasks to install and configure the connector:
Install the connector¶
Navigate to the Openflow Overview page. In the Featured connectors section, select View more connectors.
On the Openflow connectors page, find the connector and select Add to runtime.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list.
Select Add.
Note
Before you install the connector, ensure that you have created a database and schema in Snowflake for the connector to store ingested data.
Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.
Authenticate to the runtime with your Snowflake account credentials.
The Openflow canvas appears with the connector process group added to it.
Configure the connector¶
Right-click on the imported process group and select Parameters.
Populate the required parameter values as described in Flow parameters.
Flow parameters¶
This section describes the flow parameters that you can configure based on the following parameter contexts:
Kinesis Source Parameters: Used to establish connection with Kinesis.
Kinesis Destination Parameters: Used to establish connection with Snowflake.
Kinesis Ingestion Parameters: Used to define the configuration of data downloaded from Kinesis.
Kinesis Source Parameters¶
Parameter |
Description |
---|---|
AWS Region Code |
The AWS region where your Kinesis Stream is located, for example |
AWS Access Key ID |
The AWS Access Key ID to connect to your Kinesis Stream and DynamoDB. |
AWS Secret Access Key |
The AWS Secret Access Key to connect to your Kinesis Stream and DynamoDB. |
Schema Registry URL |
The URL of the AVRO Schema Registry. This is required if the AVRO Schema Access Strategy parameter is set to |
Schema Registry Authentication Type |
The authentication type used by the AVRO Schema Registry. This is required if the AVRO Schema Access Strategy parameter is set to
|
Schema Registry Username |
The username used for |
Schema Registry Password |
The password used for |
Kinesis Destination Parameters¶
Parameter |
Description |
---|---|
Destination Database |
The database where data will be persisted. It must already exist in Snowflake. |
Destination Schema |
The schema where data will be persisted. It must already exist in Snowflake. This parameter is case-sensitive. |
Snowflake Account Identifier |
Snowflake account name formatted as [organization-name]-[account-name] where data will be persisted. |
Snowflake Authentication Strategy |
Strategy of authentication to Snowflake. Possible values: |
Snowflake Private Key |
The RSA private key used for authentication. The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either Snowflake Private Key File or Snowflake Private Key must be defined. |
Snowflake Private Key File |
The file that contains the RSA Private Key used for authentication to Snowflake, formatted according to PKCS8 standards and having standard PEM headers and footers. The header line starts with |
Snowflake Private Key Password |
The password associated with the Snowflake Private Key File. |
Snowflake Role |
Snowflake Role used during query execution. |
Snowflake Username |
User name used to connect to Snowflake instance. |
Snowflake Warehouse |
Snowflake warehouse used to run queries. This parameter is case-sensitive. |
Kinesis Ingestion Parameters¶
Parameter |
Description |
---|---|
Kinesis Application Name |
The name that is used for DynamoDB table name for tracking application’s progress on Kinesis Stream consumption. |
Kinesis Stream Name |
AWS Kinesis Stream Name to consume data from. |
Kinesis Initial Stream Position |
The initial stream position from which the data starts replication.
|
Kinesis DLQ Stream Name |
The stream name where all records that failed processing are sent. If this parameter is not added, you can expect a warning sign in the DLQ-related part of the connector on the Openflow canvas. |
Message Format |
The format of messages in Kinesis.
|
AVRO Schema Access Strategy |
To access data in AVRO message format, the schema is required. This parameter defines the strategy to access the AVRO schema of a particular message.
If the Message Format parameter is set to
|
Kinesis Stream To Table Map |
This optional parameter lets a user specify which streams should be mapped to which tables. Each stream and its table name should be separated by a colon. This table name must be a valid Snowflake unquoted identifier. The regular expressions cannot be ambiguous and any matched stream must match only a single target table. If empty or no matches are found, the stream name is used as the table name.
|
Iceberg Enabled |
Specifies whether the processor ingests data into an Iceberg table. The processor fails if this property doesn’t match the actual table type.
|
Run the flow¶
Right-click on the plane and select Enable all Controller Services.
Right-click on the imported process group and select Start.
The connector starts the data ingestion.
Schema¶
The Snowflake table loaded by the connector consists of only two OBJECT
columns: VALUE
and METADATA
. Below is an example how such table looks.
Row |
METADATA |
VALUE |
---|---|---|
1 |
{“sequenceNumber”:1669074170090, “subSequenceNumber”: … |
{“account”: “ABC123”, “symbol”: “ZTEST”, “side”:… |
2 |
{“sequenceNumber”:1669074170092, “subSequenceNumber”: … |
{“account”: “XYZ789”, “symbol”: “ZABZX”, “side”:… |
3 |
{“sequenceNumber”:1669074170095, “subSequenceNumber”: … |
{“account”: “XYZ789”, “symbol”: “ZTEST”, “side”:… |
4 |
{“sequenceNumber”:1669074170096, “subSequenceNumber”: … |
{“account”: “ABC123”, “symbol”: “ZABZX”, “side”:… |
5 |
{“sequenceNumber”:1669074170098, “subSequenceNumber”: … |
{“account”: “ABC123”, “symbol”: “ZTEST”, “side”:… |
Schema evolution¶
Currently, when Iceberg Enabled
is set to false
, the connector uses two OBJECT type columns which will accept any data that will be passed in the messages.
For case when Iceberg Enabled
is set to true
, see the paragraph Schema evolution for Apache Iceberg™ tables.
Using the Openflow Connector for Kinesis with Apache Iceberg™ tables¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table.
Requirements and limitations¶
Before you configure the connector for Iceberg table ingestion, note the following requirements and limitations:
You must create an Iceberg table before running the connector.
Make sure that the user has access to inserting data into the created tables.
Schema evolution is not supported for Iceberg tables.
Configuration and setup¶
To configure the connector for Iceberg table ingestion, follow the instructions in Configure the connector with a few differences that are described in the following sections.
Enable ingestion into Iceberg table¶
To enable ingestion into an Iceberg table, you must set the Iceberg Enabled
parameter to true
.
Create an Iceberg table for ingestion¶
Before you run the connector, you must create an Iceberg table. As schema evolution is not supported, you must create the table with the following:
The
METADATA
columnThe
VALUE
column of type object containing all Kinesis message fields
When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
For example, consider the following message:
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
To create an Iceberg table for the example message, use the following statement:
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( METADATA OBJECT( sequenceNumber STRING, subSequenceNumber LONG, shardedSequenceNumber STRING, stream STRING, shardId STRING, partitionKey STRING, approximateArrival TIMESTAMP ), VALUE OBJECT( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
Note
METADATA must always be created. Field names inside nested structures such as dogs
or cats
are case sensitive.
Schema evolution for Apache Iceberg™ tables¶
Currently the connector does not support evolving the schema for Apache Iceberg™ tables.
Known issues¶
The connector’s process group has one output port named as “Upload Failure”. It can be used to handle FlowFiles that were not successfully uploaded to Snowflake. If this port is not connected outside of the connector’s process group, it will display a warning sign that can be ignored.
All processors, when stopped, can be ordered to run once. ConsumeKinesisStream processor, due to internal architecture, will do no meaningful work when ordered to run once. For the processor to start working, it has to be started and run for about two minutes.