Kafka용 Openflow Connector 설정하기

참고

이 커넥터에는 `Snowflake Connector 약관<https://www.snowflake.com/legal/snowflake-connector-terms/>`_이 적용됩니다.

전제 조건

  1. Snowflake Openflow Connector for Kafka 을 검토했는지 확인합니다.

  2. Openflow - BYOC 설정 또는 :doc:`Openflow - Snowflake Deployments를 설정</user-guide/data-integration/openflow/setup-openflow-spcs>`했는지 확인합니다.

  3. If using Openflow - Snowflake Deployments, ensure that you’ve reviewed configuring required domains and have granted access to the required domains for the Kafka connector. The connector must be able to connect to all Kafka brokers in the cluster.

Snowflake 계정 설정하기

Snowflake 계정 관리자는 다음 작업을 수행합니다.

  1. 유형이 SERVICE 인 새 Snowflake 서비스 사용자를 생성합니다.

  2. Create a new role or use an existing role and grant the Database privileges.

    The connector requires a user to create the destination table. Make sure the user has the required privileges for managing Snowflake objects:

    오브젝트

    권한

    참고

    데이터베이스

    USAGE

    스키마

    USAGE

    테이블

    OWNERSHIP

    Required for the connector to ingest data into a table.

    Snowflake recommends creating a separate user and role for each Kafka Cluster for better access control.

    다음 스크립트를 사용하여 사용자 지정 역할을 생성하고 구성할 수 있습니다(SECURITYADMIN 또는 이와 동등한 요구 사항 필요):

    USE ROLE securityadmin;
    CREATE ROLE openflow_kafka_connector_role_1;
    
    GRANT USAGE ON DATABASE kafka_db TO ROLE openflow_kafka_connector_role_1;
    GRANT USAGE ON SCHEMA kafka_schema TO ROLE openflow_kafka_connector_role_1;
    

    참고

    Privileges must be granted directly to the connector role and cannot be inherited.

  3. Configure the destination table

    Snowflake highly recommends using server-side schema evolution for schema changes and an error table for DML error logging. The following example shows how to create a table and add proper OWNERSHIP permissions.

    USE ROLE openflow_kafka_connector_role_1;
    
    CREATE TABLE kafka_db.kafka_schema.<DESTINATION_TABLE_NAME> (
      kafkaMetadata variant
    )
    ENABLE_SCHEMA_EVOLUTION = TRUE
    ERROR_LOGGING = TRUE;
    
    USE ROLE securityadmin;
    GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE openflow_kafka_connector_role_1;
    

    The connector supports automatic schema detection and evolution. The structure of tables in Snowflake is defined and evolved automatically to support the structure of new data loaded by the connector. It automatically maps the record content’s first-level keys to table columns matching by name (case-insensitive).

    With Schema evolution enabled, Snowflake can automatically expand the destination table by adding new columns that are detected in the incoming stream and dropping NOT NULL constraints to accommodate new data patterns. For more information, see Table schema evolution.

    If ENABLE_SCHEMA_EVOLUTION isn’t enabled, you must create the schema manually by extending the table definition. The connector tries to match the record content’s first-level keys to the table columns by name. If keys from the JSON don’t match the table columns, the connector ignores the keys.

  4. (Optional) Configure a secrets manager

    Snowflake는 이 단계를 강력히 권장합니다. Openflow에서 지원하는 시크릿 관리자(예: AWS, Azure, Hashicorp)를 구성하고 공개 및 개인 키를 시크릿 스토어에 저장합니다.

    1. Determine how you’ll authenticate to the secrets manager after it’s configured. On AWS, Snowflake recommends using the EC2 instance role associated with Openflow so no other secrets need to be persisted.

    2. Configure a Parameter Provider associated with this Secrets Manager in Openflow from the hamburger menu in the upper right. Navigate to Controller Settings > Parameter Provider and fetch your parameter values.

    3. Reference all credentials with the associated parameter paths so no sensitive values need to be persisted within Openflow.

  5. Grant access to users

    For any other Snowflake users who require access to the raw ingested data by the connector (for example, for custom processing in Snowflake), grant those users the role created in step 1.

커넥터 설정하기

데이터 엔지니어는 다음 작업을 수행하여 커넥터를 설치하고 구성합니다.

커넥터 설치하기

To install the connector, do the following:

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. On the Openflow connectors page, find the connector and select Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and select Add.

    참고

    Before you install the connector, ensure that you have created a database, schema, and a table in Snowflake for the connector to store ingested data.

  4. Authenticate to the deployment with your Snowflake account credentials and select Allow when prompted to allow the runtime application to access your Snowflake account. The connector installation process takes a few minutes to complete.

  5. Snowflake 계정 자격 증명으로 런타임에 인증합니다.

    커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.

커넥터 구성

  1. If needed, customize the connector configuration before configuring the built-in parameters.

  2. 프로세스 그룹 매개 변수 채우기

    1. Right click on the imported process group and select Parameters.

    2. Fill out the required parameter values

Parameters

The following table describes the parameters for the Openflow Connector for Kafka:

매개 변수

설명

필수

Kafka 자동 오프셋 리셋

Automatic offset configuration applied when no previous consumer offset is found corresponding to Kafka auto.offset.reset property.

Possible values: earliest: automatically reset the offset to the earlier offset, latest: automatically reset the offset to the latest offset, none: throw exception to the consumer if no previous offset found for the consumer group.

Default: latest

Kafka 부트스트랩 서버

A comma-separated list of Kafka bootstrap servers, should contain a port, for example kafka-broker:9092.

Kafka Consumer Group ID

커넥터가 사용하는 컨슈머 그룹의 ID. 임의적일 수 있지만 고유해야 합니다.

Kafka SASL 비밀번호

Password provided with configured password when using SASL512 SCRAM Mechanism

Kafka SASL 사용자 이름

Username provided with configured password when using SASL512 SCRAM Mechanism

Kafka 항목 형식

One of: names / pattern. Specifies whether the “Kafka Topics” provided are a comma separated list of names or a single regular expression.

Kafka 항목

쉼표로 구분된 Kafka 항목 목록 또는 정규식입니다.

Snowflake Destination Database

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Snowflake Destination Schema

The schema where data is persisted, which must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

다음 예제를 참조하세요.

CREATE SCHEMA SCHEMA_NAME or CREATE SCHEMA schema_name: use SCHEMA_NAME.

CREATE SCHEMA "schema_name" or CREATE SCHEMA "SCHEMA_NAME": use schema_name or SCHEMA_NAME, respectively.

Snowflake Destination Table

The table where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Start the connector

  1. Right-click on the plane and select Enable all Controller Services.

  2. Right-click on the plane and select Start. The connector starts data ingestion.

Understanding KAFKAMETADATA column

The connector populates the KAFKAMETADATA structure with metadata about the Kafka record. The structure contains the following information:

Field

Data Type

설명

topic

String

The name of the Kafka topic that the record came from.

partition

number

The number of the partition within the topic. (Note that this is the Kafka partition, not the Snowflake micro-partition.)

offset

number

The offset in that partition.

timestamp

number

Timestamp when the record was added to Kafka.

key

String

If the message is a Kafka KeyedMessage, this is the key for that message. In order for the connector to store the key in the RECORD_METADATA, the key.converter parameter in the Kafka configuration properties must be set to org.apache.kafka.connect.storage.StringConverter; otherwise, the connector ignores keys.

headers

오브젝트

A header is a user-defined key-value pair associated with the record. Each record can have 0, 1, or multiple headers.

Measuring ingestion latency

For change tracking, incremental processing, and time-travel queries based on row modification time, the ROW_TIMESTAMP feature can be used.

Enable it by running the following command on your destination table:

ALTER TABLE <DESTINATION_TABLE> SET ROW_TIMESTAMP = TRUE;

After row timestamps are enabled, tables expose the METADATA$ROW_LAST_COMMIT_TIME column, which returns the timestamp when each row was last modified.

For more information, see METADATA$ROW_LAST_COMMIT_TIME.

참고

Row timestamp isn’t available for interactive tables. For more information, see Snowflake 대화형 테이블 및 대화형 웨어하우스.

Using the connector with Apache Iceberg™ tables

The connector can ingest data into Snowflake-managed Apache Iceberg™ tables but must meet the following requirements:

  • You must have been granted the USAGE privilege on the external volume associated with your Apache Iceberg™ table.

  • You must create an Apache Iceberg™ table before running the connector.

Grant usage on an external volume

For example, if your Iceberg table uses the kafka_external_volume external volume and the connector uses the role openflow_kafka_connector_role, run the following statement:

USE ROLE ACCOUNTADMIN;
GRANT USAGE ON EXTERNAL VOLUME kafka_external_volume TO ROLE openflow_kafka_connector_role;

Create an Apache Iceberg™ table for ingestion

The connector doesn’t create Iceberg tables automatically and doesn’t support schema evolution. Before you run the connector, you must create an Iceberg table manually.

When you create an Iceberg table, you can use Iceberg data types (including VARIANT) or compatible Snowflake types.

For example, consider the following message:

{
  "id": 1,
  "name": "Steve",
  "body_temperature": 36.6,
  "approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
  "animals_possessed": {
    "dogs": true,
    "cats": false
  },
  "options": {
    "can_walk": true,
    "can_talk": false
  },
  "date_added": "2024-10-15"
}

To create an Iceberg table for the example message, use one of the following statements:

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
  kafkaMetadata OBJECT(
    topic STRING,
    partition INTEGER,
    offset INTEGER,
    key STRING,
    headers variant,
    timestamp INTEGER
  ),
  id INT,
  name string,
  body_temperature float,
  approved_coffee_types array(string),
  animals_possessed variant,
  date_added date,
  options object(can_walk boolean, can_talk boolean)
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ICEBERG_VERSION = 3;

Using the connector with Interactive Tables

Interactive tables are a special type of Snowflake table optimized for low-latency, high-concurrency queries. For more information, see Snowflake 대화형 테이블 및 대화형 웨어하우스.

  1. Create an interactive table:

    CREATE INTERACTIVE TABLE REALTIME_METRICS (
      metric_name VARCHAR,
      metric_value NUMBER,
      source_topic VARCHAR,
      timestamp TIMESTAMP_NTZ
    ) AS (SELECT
          $1:M_NAME::VARCHAR,
          $1:M_VALUE::NUMBER,
          $1:RECORD_METADATA.topic::VARCHAR,
          $1:RECORD_METADATA.timestamp::TIMESTAMP_NTZ
          from TABLE(DATA_SOURCE(TYPE => 'STREAMING')));
    

Important considerations:

  • Interactive tables have specific limitations and query restrictions. Review Snowflake 대화형 테이블 및 대화형 웨어하우스 before using them with the connector.

  • For interactive tables, any required transformations must be handled in the table definition.

  • Interactive warehouses are required to query interactive tables efficiently.

Using the connector with a customer-defined schema for the destination table

The connector treats each Kafka record as a row to be inserted into a Snowflake table. For example, if you have a Kafka topic with the content of the message structured like the following JSON:

{
  "order_id": 12345,
  "customer_name": "John",
  "order_total": 100.00,
  "isPaid": true
}

By default you don’t have to specify all fields from the JSON thanks to the ENABLE_SCHEMA_EVOLUTION = TRUE feature. However, if you prefer a static schema, it can be created by running:

CREATE TABLE ORDERS (
  kafkaMetadata OBJECT,
  order_id NUMBER,
  customer_name VARCHAR,
  order_total FLOAT,
  ispaid BOOLEAN
);

Using the connector with a customer-defined PIPE

If you choose to create your own pipe, you can define the data transformation logic in the pipe’s COPY INTO statement. You can rename columns as required and cast the data types as needed. For example:

CREATE TABLE ORDERS (
  order_id VARCHAR,
  customer_name VARCHAR,
  order_total VARCHAR,
  ispaid VARCHAR
);

CREATE PIPE ORDERS AS
COPY INTO ORDERS
SELECT
  $1:order_id::STRING,
  $1:customer_name,
  $1:order_total::STRING,
  $1:isPaid::STRING
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'));

When you define your own pipe, your destination table columns don’t need to match the JSON keys. You can rename the columns to your desired names and cast the data types if required.

To adjust the connector to work with a custom pipe, perform the following tasks:

  1. Right-click on the PublishSnowpipeStreaming processor used in your Kafka ingestion flow in the Openflow canvas.

  2. Select Configure from the context menu.

  3. Navigate to the Properties tab.

  4. In the Destination type field, pick Pipe.

  5. In the Pipe field, type the name of your PIPE.

  6. Select Apply to save the configuration.

Customizing error handling

Error handling is split between Openflow-side failures and server-side failures within the Snowpipe Streaming service.

  • Openflow Errors (Client-Side Failures): Errors such as unparseable payloads or custom transformation failures occur before records reach Snowflake. By default these records are discarded. It’s possible to process these errors in Openflow - use FlowFiles from the parse failure relationship in the ConsumeKafka processor.

  • Snowpipe Streaming Errors (Server-Side Failures): Errors for records that successfully reach Snowflake but are incompatible with the destination table’s schema (for example, type mismatches) are captured by the Snowflake infrastructure. When error logging is enabled on the destination table (error_logging = true), these failed rows are automatically ingested into the destination Error table.

Performance tuning

Kafka용 Openflow Connector의 성능 조정