Set up Openflow Connector for Kinesis for JSON data format

참고

This connector is subject to the Snowflake Connector Terms.

This topic describes the set up steps for the Openflow Connector for Kinesis for JSON data format. This is a simplified connector optimized for basic message ingestion with schema evolution capabilities.

The Openflow Connector for Kinesis for JSON data format is designed for straightforward JSON message ingestion from Kinesis streams to Snowflake tables.

전제 조건

  1. Openflow Connector for Kinesis 정보 을 검토했는지 확인합니다.

  2. Ensure that you have Openflow - BYOC 설정 or Set up Openflow - Snowflake Deployments.

  3. |OFSFSPCS-plural|를 사용하는 경우 필수 도메인 구성</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`을 검토했는지와 :ref:`label-openflow_domains_used_by_openflow_connectors_kinesis 커넥터의 필수 도메인에 대한 액세스 권한을 부여했는지 확인합니다.

참고

If you need the support of other data formats or features, such as DLQ, reach out to your Snowflake representative.

Kinesis Stream 설정하기

AWS 계정 관리자는 AWS 계정에서 다음 작업을 수행합니다.

  1. Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.

  2. Ensure that the AWS User has configured Access Key credentials.

Snowflake 계정 설정하기

Snowflake 계정 관리자는 다음 작업을 수행합니다.

  1. 새 역할을 생성하거나 기존 역할을 사용하여 데이터베이스 권한 권한을 부여합니다.

  2. 데이터를 저장할 대상 데이터베이스와 대상 테이블을 생성하는 데 사용할 대상 스키마를 생성합니다.

    1. 대상 테이블이 없는 경우 커넥터의 기능을 사용하여 대상 테이블을 자동으로 생성할 계획이라면 사용자에게 Snowflake 오브젝트를 생성하고 관리하는 데 필요한 권한이 있는지 확인하십시오.

      오브젝트

      권한

      참고

      데이터베이스

      USAGE

      스키마

      USAGE . CREATE TABLE .

      스키마 수준 오브젝트가 생성된 후 CREATE object 권한이 취소될 수 있습니다.

      테이블

      OWNERSHIP

      Kinesis 커넥터를 사용하여 기존 테이블로 데이터를 수집할 때만 필수입니다. . 커넥터가 Kinesis 스트림의 레코드에 대한 새 대상 테이블을 생성하는 경우 구성에 지정된 사용자의 기본 역할이 테이블 소유자가 됩니다.

      다음 스크립트를 사용하여 사용자 지정 역할을 생성하고 구성할 수 있습니다(SECURITYADMIN 또는 이와 동등한 요구 사항 필요):

      USE ROLE SECURITYADMIN;
      
      CREATE ROLE kinesis_connector_role;
      GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role;
      GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role;
      
      -- Only for existing tables.
      GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
      
      Copy
  3. 유형이 SERVICE 인 새 Snowflake 서비스 사용자를 생성합니다.

  4. Snowflake 서비스 사용자에게 이전 단계에서 생성한 역할을 부여합니다.

    GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user;
    ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
    
    Copy
  5. 3단계의 Snowflake SERVICE 사용자에 대해 키 페어 인증 으로 구성합니다.

  6. Snowflake는 이 단계를 강력히 권장합니다. Openflow에서 지원하는 시크릿 관리자(예: AWS, Azure, Hashicorp)를 구성하고 공개 및 개인 키를 시크릿 스토어에 저장합니다.

    참고

    어떤 이유로든 시크릿 관리자를 사용하지 않으려면 조직의 보안 정책에 따라 키 페어 인증에 사용되는 공개 키와 개인 키 파일을 보호할 책임이 있습니다.

    1. 시크릿 관리자가 구성되면 인증 방법을 결정합니다. AWS 에서는 다른 시크릿을 유지할 필요가 없으므로 Openflow와 연결된 EC2 인스턴스 역할을 사용하는 것이 좋습니다.

    2. Openflow에서 오른쪽 상단의 햄버거 메뉴에서 이 시크릿 관리자와 연결된 매개 변수 공급자를 구성합니다. Controller Settings » Parameter Provider 로 이동한 다음 매개 변수 값을 가져옵니다.

    3. 이 시점에서 모든 자격 증명은 연결된 매개 변수 경로로 참조할 수 있으며 민감한 값은 Openflow 내에서 유지될 필요가 없습니다.

  7. If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.

커넥터 설정하기

데이터 엔지니어는 다음 작업을 수행하여 커넥터를 설치하고 구성합니다.

커넥터 설치하기

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Openflow Connector 페이지에서 커넥터를 찾아 Add to runtime 을 선택합니다.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    참고

    커넥터를 설치하기 전에 커넥터가 수집한 데이터를 저장할 수 있도록 Snowflake에서 데이터베이스와 스키마를 생성했는지 확인하십시오.

  4. Snowflake 계정 자격 증명으로 배포를 인증하고 런타임 애플리케이션이 Snowflake 계정에 액세스할 수 있도록 허용할지 묻는 메시지가 표시되면 Allow 를 선택합니다. 커넥터 설치 프로세스를 완료하는 데 몇 분 정도 걸립니다.

  5. Snowflake 계정 자격 증명으로 런타임에 인증합니다.

커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.

커넥터 구성

  1. 가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Parameters 를 선택합니다.

  2. Populate the required parameter values as described in Parameters section below.

Parameters

This section describes all parameters for the Openflow Connector for Kinesis for JSON data format.

The connector consists of a several modules. To see the set, double click on the connector process group. You will be able to set the parameters for each module in the module’s parameter context.

Snowflake destination parameters

매개 변수

설명

필수

대상 데이터베이스

데이터가 유지될 데이터베이스입니다. Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다.

대상 스키마

데이터가 유지될 스키마로, Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다.

다음 예제를 참조하세요.

  • CREATE SCHEMA SCHEMA_NAME 또는 CREATE SCHEMA schema_name: SCHEMA_NAME 사용

  • CREATE SCHEMA "schema_name" 또는 CREATE SCHEMA "SCHEMA_NAME": 각각 schema_name 또는 SCHEMA_NAME 사용

Iceberg 활성화됨

Whether Iceberg is enabled for table operations. One of true / false.

Schema Evolution Enabled

Enables or disables schema evolution at the connector level. When enabled, allows automatic schema changes for tables. Note that schema evolution can also be controlled at the individual table level through table-specific parameters. One of: true / false.

Schema Evolution For New Tables Enabled

Controls whether schema evolution is enabled when creating new tables. When set to ‘true’, new tables will be created with ENABLE_SCHEMA_EVOLUTION = TRUE parameter. When set to ‘false’, new tables will be created with ENABLE_SCHEMA_EVOLUTION = FALSE parameter. Not applicable to Iceberg tables as they are not being created automatically. This setting only affects table creation, not existing tables. One of: true / false.

Snowflake 계정 식별자

사용하는 경우:

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: 데이터가 유지될 [organization-name]-[account-name] 형식의 Snowflake 계정 이름입니다.

Snowflake Authentication Strategy

사용하는 경우:

  • Snowflake Openflow Deployment 또는 BYOC: SNOWFLAKE_SESSION_TOKEN 을 사용합니다. 이 토큰은 Snowflake에서 자동으로 관리됩니다. BYOC 배포에서 SNOWFLAKE_SESSION_TOKEN을 사용하도록 :ref:`런타임 역할<label-deployment_byoc_setup_runtime_role>`을 미리 구성했어야 합니다.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Snowflake 개인 키

사용하는 경우:

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: 인증에서 사용되는 RSA 개인 키여야 합니다.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

아니요

Snowflake 개인 키 파일

사용하는 경우:

  • Session token authentication strategy: 개인 키 파일은 비워 두어야 합니다.

  • KEY_PAIR: PKCS8 표준에 따라 형식이 지정되고 표준 PEM 머리글과 바닥글을 포함하며 Snowflake 인증에 사용되는 RSA 개인 키를 포함하는 파일을 업로드합니다. 머리글 라인은 ``—–BEGIN PRIVATE``으로 시작합니다. 개인 키 파일을 업로드하려면 Reference asset 확인란을 선택합니다.

아니요

Snowflake 개인 키 비밀번호

사용하는 경우

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: Snowflake 개인 키 파일과 연결된 비밀번호를 입력합니다.

아니요

Snowflake 역할

사용하는 경우

  • Session Token Authentication Strategy: Use your Snowflake Role. You can find your Snowflake Role in the Openflow UI, by navigating to View Details for your Runtime.

  • KEY_PAIR Authentication Strategy: 서비스 사용자에 대해 구성된 유효한 역할을 사용합니다.

Snowflake 사용자 이름

사용하는 경우

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: Snowflake 인스턴스에 연결하는 데 사용되는 사용자 이름을 입력합니다.

Kinesis JSON Source Parameters

매개 변수

설명

필수

AWS 리전 코드

Kinesis Stream이 위치한 AWS 리전(예: us-west-2)입니다.

AWS 액세스 키 ID

The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

AWS 시크릿 액세스 키

The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch.

Kinesis 애플리케이션 이름

Kinesis Stream 소비에 대한 애플리케이션의 진행 상황을 추적하기 위한 DynamoDB 테이블 이름에 사용되는 이름입니다.

Kinesis 초기 스트림 위치

데이터가 복제를 시작하는 초기 스트림 위치입니다.

가능한 값은 다음과 같습니다.
  • LATEST: 최근 저장된 레코드

  • TRIM_HORIZON: 가장 먼저 저장된 레코드

Kinesis Stream 이름

AWS 데이터를 소비할 Kinesis Stream 이름입니다.

Metrics Publishing

Specifies where Kinesis Client Library metrics are published to. Possible values: DISABLED, LOGS, CLOUDWATCH.

플로우 실행

  1. 평면을 마우스 오른쪽 버튼으로 클릭하고 Enable all Controller Services 를 선택합니다.

  2. Right-click on the connector’s process group and select Start.

커넥터가 데이터 수집을 시작합니다.

Table Schema

The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages. The connector also adds a KINESISMETADATA column which stores metadata about the record.

Below is an example of a Snowflake table loaded by the connector:

ACCOUNT

SYMBOL

SIDE

QUANTITY

KINESISMETADATA

1

ABC123

ZTEST

BUY

3572

{ … KINESISMETADATA object … }

2

XYZ789

ZABZX

SELL

3024

{ … KINESISMETADATA object … }

3

XYZ789

ZTEST

SELL

799

{ … KINESISMETADATA object … }

4

ABC123

ZABZX

BUY

2033

{ … KINESISMETADATA object … }

The KINESISMETADATA column contains an object with the following fields:

Field Name

Field Type

Example Value

설명

stream

String

stream-name

The name of the Kinesis stream the record came from.

shardId

String

shardId-000000000001

The identifier of the shard in the stream the record came from.

approximateArrival

String

2025-11-05T09:12:15.300

The approximate time that the record was inserted into the stream (ISO 8601 format).

partitionKey

String

key-1234

The partition key specified by the data producer for the record.

sequenceNumber

String

123456789

The unique sequence number assigned by Kinesis Data Streams to the record in the shard.

subSequenceNumber

Number

2

The subsequence number for the record (used for aggregated records with the same sequence number).

shardedSequenceNumber

String

12345678900002

A combination of the sequence number and the subsequence number for the record.

스키마 진화

This connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector.

Snowflake detects the schema of the incoming data and loads data into tables that match any user-defined schema. Snowflake also allows adding new columns or dropping the NOT NULL constraint from columns missing in new incoming records.

Schema detection with the connector infers data types based on the JSON data provided.

If the connector creates the target table, schema evolution is enabled by default.

If you want to enable or disable schema evolution on an existing table, use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter. You must also use a role that has the OWNERSHIP privilege on the table. For more information, see 테이블 스키마 진화.

However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.

Iceberg table support

Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.

요구 사항 및 제한 사항

Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:

  • 커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다.

  • 사용자가 생성된 테이블에 데이터를 삽입할 수 있는 액세스 권한이 있는지 확인합니다.

구성 및 설정

To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.

Iceberg 테이블로 수집 활성화

Iceberg 테이블로 수집을 사용하려면 Iceberg Enabled 매개 변수를 true 로 설정해야 합니다.

수집을 위한 Iceberg 테이블 만들기

Before you run the connector, you must create an Iceberg table. The initial table schema depends on your connector Schema Evolution Enabled property settings.

With enabled schema evolution, you must create a table with a column named kinesisMetadata. The connector automatically creates the columns for message fields and alters the kinesisMetadata column schema.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT()
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table'
  ENABLE_SCHEMA_EVOLUTION = true;
Copy

If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.

예를 들어, 다음 메시지를 생각해 보십시오.

{
    "id": 1,
    "name": "Steve",
    "body_temperature": 36.6,
    "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
    "animals_possessed":
    {
        "dogs": true,
        "cats": false
    },
    "date_added": "2024-10-15"
}
Copy

The following statement creates a table with all fields the Kinesis message contains:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    kinesisMetadata OBJECT(
        stream STRING,
        shardId STRING,
        approximateArrival STRING,
        partitionKey STRING,
        sequenceNumber STRING,
        subSequenceNumber INTEGER,
        shardedSequenceNumber STRING
    ),
    id INT,
    body_temperature FLOAT,
    name STRING,
    approved_coffee_types ARRAY(STRING),
    animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
    date_added DATE
  )
  EXTERNAL_VOLUME = 'my_volume'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'my_location/my_iceberg_table';
Copy

참고

kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.