Set up Openflow Connector for Amazon Kinesis Data Streams

참고

이 커넥터에는 `Snowflake Connector 약관<https://www.snowflake.com/legal/snowflake-connector-terms/>`_이 적용됩니다.

This topic describes how to set up Openflow Connector for Amazon Kinesis Data Streams.

Openflow Connector for Amazon Kinesis Data Streams is designed for JSON message ingestion from Kinesis streams to Snowflake tables, with schema evolution capabilities.

Set up the Openflow Connector for Kinesis

전제 조건

  1. Openflow Connector for Amazon Kinesis Data Streams 섹션을 검토합니다.

  2. Ensure that you have Openflow - BYOC 설정 or Set up Openflow - Snowflake Deployments.

  3. If you are using Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the Kinesis connector.

Set up IAM roles and policies in AWS

AWS 계정 관리자는 AWS 계정에서 다음 작업을 수행합니다.

  1. Create an AWS IAM user or role that Openflow will use to access the Kinesis data stream. For more information, see Creating IAM users in the AWS documentation.

  2. Ensure that the AWS user has configured Access Key credentials.

  3. Grant the AWS user the following IAM permissions:

    Service

    Actions

    Resources (ARNs)

    Purpose

    Amazon Kinesis Data Streams

    kinesis:DescribeStream, kinesis:DescribeStreamConsumer, kinesis:GetRecords, kinesis:GetShardIterator, kinesis:ListShards, kinesis:RegisterStreamConsumer

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}

    Discovers shards, reads records through shared-throughput polling, resolves the stream ARN, registers an Enhanced Fan-Out consumer, and polls consumer status during registration.

    Amazon Kinesis Data Streams

    kinesis:DeregisterStreamConsumer, kinesis:DescribeStreamConsumer, kinesis:SubscribeToShard

    arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*

    Describes, subscribes to, and deregisters Enhanced Fan-Out consumers by consumer ARN.

    Amazon DynamoDB

    dynamodb:CreateTable, dynamodb:DeleteTable, dynamodb:DescribeTable, dynamodb:GetItem, dynamodb:PutItem, dynamodb:Query, dynamodb:Scan, dynamodb:UpdateItem

    arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}, arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration

    Creates and manages the checkpoint/lease table (shard leases, node heartbeats, checkpoints) and a temporary migration table used during one-time migration from legacy checkpoint tables.

    Example IAM policy:

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "KinesisStreamAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:RegisterStreamConsumer"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}"
            },
            {
                "Sid": "KinesisConsumerAccess",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DeregisterStreamConsumer",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": "arn:aws:kinesis:${REGION}:${ACCOUNT_ID}:stream/${STREAM_NAME}/consumer/*"
            },
            {
                "Sid": "DynamoDBTableAccess",
                "Effect": "Allow",
                "Action": [
                    "dynamodb:CreateTable",
                    "dynamodb:DeleteTable",
                    "dynamodb:DescribeTable",
                    "dynamodb:GetItem",
                    "dynamodb:PutItem",
                    "dynamodb:Query",
                    "dynamodb:Scan",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}",
                    "arn:aws:dynamodb:${REGION}:${ACCOUNT_ID}:table/${APPLICATION_NAME}_migration"
                ]
            }
        ]
    }
    

    Before using the example policy, replace the following placeholders:

    Placeholder

    설명

    ${REGION}

    Your AWS region (for example, us-east-1)

    ${ACCOUNT_ID}

    Your AWS account ID (for example, 123456789012)

    ${STREAM_NAME}

    The value of the AWS Kinesis Stream Name connector parameter

    ${APPLICATION_NAME}

    The value of the AWS Kinesis Application Name connector parameter. Used as the DynamoDB checkpoint table name and as the Enhanced Fan-Out registered consumer name.

    참고

    • The ${APPLICATION_NAME}_migration table is a temporary DynamoDB table created only during a one-time migration from legacy checkpoint tables to the new schema. It’s deleted automatically when migration completes. If your deployment has never used the legacy KCL-based connector, you can omit the migration table ARN from the policy.

    • The dynamodb:DeleteTable action is used during the migration process and can be removed from the policy after migration is confirmed complete.

    • The kinesis:DeregisterStreamConsumer action is invoked when the processor is removed from the canvas. If the IAM principal doesn’t have this permission, the consumer must be deregistered manually through the AWS console or CLI.

Snowflake 계정 설정하기

Snowflake 계정 관리자는 다음 작업을 수행합니다.

  1. 유형이 SERVICE 인 새 Snowflake 서비스 사용자를 생성합니다.

  2. Create a new role or use an existing role and grant the database privileges.

    The connector requires the user to create the destination table. Make sure the user has the required privileges for managing Snowflake objects:

    오브젝트

    권한

    참고

    데이터베이스

    USAGE

    스키마

    USAGE

    테이블

    OWNERSHIP

    Required for the connector to ingest data into a table.

    Snowflake recommends creating a separate user and role for each Kinesis stream for better access control.

    다음 스크립트를 사용하여 사용자 지정 역할을 생성하고 구성할 수 있습니다(SECURITYADMIN 또는 이와 동등한 요구 사항 필요):

    USE ROLE securityadmin;
    
    CREATE ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON DATABASE kinesis_db TO ROLE openflow_kinesis_connector_role_1;
    GRANT USAGE ON SCHEMA kinesis_schema TO ROLE openflow_kinesis_connector_role_1;
    

    참고

    Privileges must be granted directly to the connector role and can’t be inherited.

  3. Configure the destination table

    We highly recommend using server-side schema evolution for schema changes and an error table for DML error logging.

    The example below shows how to create a table and add OWNERSHIP permissions.

    USE ROLE openflow_kinesis_connector_role_1;
    
    CREATE TABLE kinesis_db.kinesis_schema.<DESTINATION_TABLE_NAME> (
      kinesisMetadata object
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE <DESTINATION_TABLE_NAME> TO ROLE openflow_kinesis_connector_role_1;
    

    These connectors provide support for automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector. It will automatically map the record content’s first-level keys to table columns matching by name (case-insensitive).

    With Schema evolution enabled, Snowflake can automatically expand the destination table by adding new columns that are detected in the incoming stream and dropping NOT NULL constraints to accommodate new data patterns. For more information, see Table schema evolution.

    If ENABLE_SCHEMA_EVOLUTION is not enabled, then you have to create the schema manually by extending the table definition. The connector tries to match the record content’s first-level keys to the table columns by name. If keys from the JSON do not match the table columns, the connector ignores the keys.

  4. (Optional) Configure a secrets manager

    Snowflake는 이 단계를 강력히 권장합니다. Openflow에서 지원하는 시크릿 관리자(예: AWS, Azure, Hashicorp)를 구성하고 공개 및 개인 키를 시크릿 스토어에 저장합니다.

    1. Once the secrets manager is configured, determine how you will authenticate to it. On AWS, it’s recommended that you use the EC2 instance role associated with Openflow as this way no other secrets have to be persisted.

    2. In the Openflow canvas, configure a Parameter Provider associated with this Secrets Manager, from the hamburger menu in the upper right. Navigate to Controller Settings » Parameter Provider and then fetch your parameter values.

    3. 이 시점에서 모든 자격 증명은 연결된 매개 변수 경로로 참조할 수 있으며 민감한 값은 Openflow 내에서 유지될 필요가 없습니다.

  5. Grant access to users

    Any other Snowflake users who require access to the raw ingested data by the connector (for example, for custom processing in Snowflake), should be granted the role created in step 2.

커넥터 설정하기

데이터 엔지니어는 다음 작업을 수행하여 커넥터를 설치하고 구성합니다.

커넥터 설치하기

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the Openflow connector for Amazon Kinesis Data Streams and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    참고

    Before you install the connector, ensure that you have created a database, schema, and a table in Snowflake for the connector to store ingested data.

  4. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  5. Snowflake 계정 자격 증명으로 런타임에 인증합니다.

    커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.

커넥터 구성

  1. If needed, customize the connector configuration before configuring the built-in parameters.

  2. Populate the process group parameters

    1. Right-click on the imported process group and select Parameters.

    2. Fill out the required parameter values.

Common parameters

매개 변수

설명

필수

AWS 액세스 키 ID

The AWS Access Key ID to connect to your Kinesis Stream and DynamoDB.

AWS Kinesis Region

The AWS Region to connect to. Use regular AWS region format, for example: us-west-2, ap-southeast-1, eu-west-1. See the AWS Regions page.

AWS 시크릿 액세스 키

The AWS Secret Access Key to connect to your Kinesis Stream and DynamoDB.

AWS Kinesis Application Name

The name that is used as the DynamoDB table name for tracking the application’s progress on Kinesis Stream consumption.

AWS Kinesis Consumer Type

The strategy used to read records from a Kinesis Stream.

Must be one of the following values: SHARED_THROUGHPUT, ENHANCED_FAN_OUT.

For more information, see Differences between shared throughput consumer and enhanced fan-out consumer.

AWS Kinesis Initial Stream Position

The initial stream position from which the data starts replication. This takes effect only during the initial start for a given AWS Kinesis Application Name.

가능한 값은 다음과 같습니다.

LATEST: Latest stored record,

TRIM_HORIZON: Earliest stored record.

AWS Kinesis Stream Name

The AWS Kinesis Stream Name to consume data from.

Snowflake Destination Database

데이터가 유지될 데이터베이스입니다. Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다.

Snowflake Destination Schema

데이터가 유지될 스키마로, Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다.

다음 예제를 참조하세요.

CREATE SCHEMA SCHEMA_NAME or CREATE SCHEMA schema_name: use SCHEMA_NAME.

CREATE SCHEMA "schema_name" or CREATE SCHEMA "SCHEMA_NAME": use schema_name or SCHEMA_NAME, respectively.

Snowflake Destination Table

The table where data will be persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Start the connector

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the plane and select Start. The connector starts data ingestion.

Understanding KINESISMETADATA column

The connector populates the KINESISMETADATA structure with metadata about the Kinesis record. The structure contains the following information:

필드 이름

필드 유형

예제 값

설명

stream

String

stream-name

레코드의 출처인 Kinesis 스트림의 이름입니다.

shardId

String

shardId-000000000001

레코드의 출처인 스트림에 있는 shard의 식별자입니다.

approximateArrival

String

2025-11-05T09:12:15.300

레코드가 스트림에 삽입된 대략적인 시간(ISO 8601 형식)입니다.

partitionKey

String

key-1234

레코드에 대해 데이터 생산자가 지정한 파티션 키입니다.

sequenceNumber

String

123456789

Kinesis Data Streams가 shard의 레코드에 할당한 고유한 시퀀스 번호입니다.

subSequenceNumber

숫자

2

레코드의 하위 시퀀스 번호입니다(동일한 시퀀스 번호를 가진 집계된 레코드에 사용됨).

shardedSequenceNumber

String

12345678900002

레코드의 시퀀스 번호와 하위 시퀀스 번호의 조합입니다.

Measuring ingestion latency

For change tracking, incremental processing, and Time Travel queries based on row modification time, the ROW_TIMESTAMP feature can be used.

It can be enabled by running the following command on your destination table:

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

After row timestamps are enabled, tables expose the METADATA$ROW_LAST_COMMIT_TIME column, which returns the timestamp when each row was last modified.

For more information, see Row timestamps.

참고

Row timestamp isn’t available for interactive tables. For more information, see 대화형 테이블의 제한 사항.

Using the connector with Apache Iceberg™ tables

The connector can ingest data into Snowflake-managed Apache Iceberg™ tables but must meet the following requirements:

  • You must have been granted the USAGE privilege on the external volume associated with your Apache Iceberg™ table.

  • You must create an Apache Iceberg™ table before running the connector.

Grant usage on an external volume

For example, if your Iceberg table uses the kinesis_external_volume external volume and the connector uses the role openflow_kinesis_connector_role_1, run the following statement:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kinesis_external_volume TO ROLE openflow_kinesis_connector_role_1;

Create an Apache Iceberg™ table for ingestion

The connector does not create Iceberg tables automatically and does not support schema evolution. Before you run the connector, you must create an Iceberg table manually.

When you create an Iceberg table, you can use Iceberg data types (including VARIANT) or compatible Snowflake types.

예를 들어, 다음 메시지를 생각해 보십시오.

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

To create an Iceberg table for the example message, use one of the following statements:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kinesisMetadata OBJECT(
    stream STRING,
    shardId STRING,
    approximateArrival STRING,
    partitionKey STRING,
    sequenceNumber STRING,
    subSequenceNumber INTEGER,
    shardedSequenceNumber STRING
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Using the connector with Interactive Tables

Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. You can find out more about interactive tables in the interactive tables documentation.

  1. Create an interactive table:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) CLUSTER BY (metric_name)
    AS (SELECT
      $1:M_NAME::VARCHAR,
      $1:M_VALUE::NUMBER,
      $1:RECORD_METADATA.topic::VARCHAR,
      $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
    from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Important considerations:

  • Interactive tables have specific limitations and query restrictions. Review the interactive tables documentation before using them with the connector.

  • For interactive tables, any required transformations must be handled in the table definition.

  • Interactive warehouses are required to query interactive tables efficiently.

Using the connector with a customer-defined schema for the destination table

The connector treats each Kinesis record as a row to be inserted into a Snowflake table. For example, if you have a Kinesis topic with the content of the message structured like the following JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

By default you don’t have to specify all fields from the JSON. Schema evolution will take care of it. However, if you prefer a static schema, it can be created by running:

CREATE TABLE ORDERS (
  kinesisMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Using the connector with a customer-defined PIPE

If you choose to create your own pipe, you can define the data transformation logic in the pipe’s COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
FROM (
  SELECT
    $1:order_id::STRING,
    $1:customer_name,
    $1:order_total::STRING,
    $1:isPaid::STRING
  FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);

When you define your own pipe your destination table columns do not have to match the JSON keys. You can rename the columns to your desired names and cast the data types if required.

To adjust the connector to work with a custom pipe, perform the following tasks:

  1. Right-click on the PublishSnowpipeStreaming processor used in your Kinesis ingestion flow in the Openflow canvas.

  2. Select Configure from the context menu.

  3. Navigate to the Properties tab.

  4. In the Destination type field, pick Pipe.

  5. In the Pipe field, type the name of your pipe.

  6. Select Apply to save the configuration.

Customizing error handling

Error handling is split between Openflow-side failures and server-side failures within the Snowpipe Streaming service.

  • Openflow Errors (Client-Side Failures): Errors such as unparseable payloads or custom transformation failures occur before records reach Snowflake. By default these records are discarded. It’s possible to process these errors in Openflow - use FlowFiles from the parse failure relationship in the ConsumeKinesis processor.

  • Snowpipe Streaming Errors (Server-Side Failures): Errors for records that successfully reach Snowflake but are incompatible with the destination table’s schema (for example, type mismatches) are captured by the Snowflake infrastructure. When error logging is enabled on the destination table (error_logging = true), these failed rows are automatically ingested into the destination Error table.

Next steps