Set up Openflow Connector for Kinesis for JSON data format¶
참고
This connector is subject to the Snowflake Connector Terms.
이 항목에서는 JSON 데이터 형식에 Openflow Connector for Kinesis 를 설정하는 단계에 대해 설명합니다. 이는 스키마 진화 기능을 사용하여 기본 메시지 수집에 최적화된 간소화된 커넥터입니다.
JSON 데이터 형식을 위한 Openflow Connector for Kinesis 는 Kinesis 스트림에서 Snowflake 테이블로의 간단한 JSON 메시지 수집을 위해 설계되었습니다.
전제 조건¶
Openflow Connector for Kinesis 정보 을 검토했는지 확인합니다.
Ensure that you have Openflow - BYOC 설정 or Set up Openflow - Snowflake Deployments.
Openflow - Snowflake Deployments 를 사용하는 경우 필수 도메인 구성</user-guide/data-integration/openflow/setup-openflow-spcs-sf-allow-list>`을 검토했는지와 :ref:`label-openflow_domains_used_by_openflow_connectors_kinesis 커넥터의 필수 도메인에 대한 액세스 권한을 부여했는지 확인합니다.
참고
DLQ와 같은 다른 데이터 형식 또는 기능에 대한 지원이 필요한 경우 Snowflake 담당자에게 문의하세요.
Kinesis Stream 설정하기¶
AWS 계정 관리자는 AWS 계정에서 다음 작업을 수행합니다.
Ensure that you have an AWS User with IAM permissions to access Kinesis Streams and DynamoDB.
Ensure that the AWS User has configured Access Key credentials.
Snowflake 계정 설정하기¶
Snowflake 계정 관리자는 다음 작업을 수행합니다.
새 역할을 생성하거나 기존 역할을 사용하여 데이터베이스 권한 권한을 부여합니다.
데이터를 저장할 대상 데이터베이스와 대상 테이블을 생성하는 데 사용할 대상 스키마를 생성합니다.
대상 테이블이 없는 경우 커넥터의 기능을 사용하여 대상 테이블을 자동으로 생성할 계획이라면 사용자에게 Snowflake 오브젝트를 생성하고 관리하는 데 필요한 권한이 있는지 확인하십시오.
오브젝트
권한
참고
데이터베이스
USAGE
스키마
USAGE . CREATE TABLE .
스키마 수준 오브젝트가 생성된 후 CREATE
object권한이 취소될 수 있습니다.테이블
OWNERSHIP
Kinesis 커넥터를 사용하여 기존 테이블로 데이터를 수집할 때만 필수입니다. . 커넥터가 Kinesis 스트림의 레코드에 대한 새 대상 테이블을 생성하는 경우 구성에 지정된 사용자의 기본 역할이 테이블 소유자가 됩니다.
다음 스크립트를 사용하여 사용자 지정 역할을 생성하고 구성할 수 있습니다(SECURITYADMIN 또는 이와 동등한 요구 사항 필요):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role; -- Only for existing tables. GRANT OWNERSHIP ON TABLE existing_table TO ROLE kinesis_connector_role;
유형이 SERVICE 인 새 Snowflake 서비스 사용자를 생성합니다.
Snowflake 서비스 사용자에게 이전 단계에서 생성한 역할을 부여합니다.
GRANT ROLE kinesis_connector_role TO USER kinesis_connector_user; ALTER USER kinesis_connector_user SET DEFAULT_ROLE = kinesis_connector_role;
3단계의 Snowflake SERVICE 사용자에 대해 키 페어 인증 으로 구성합니다.
Snowflake는 이 단계를 강력히 권장합니다. Openflow에서 지원하는 시크릿 관리자(예: AWS, Azure, Hashicorp)를 구성하고 공개 및 개인 키를 시크릿 스토어에 저장합니다.
참고
어떤 이유로든 시크릿 관리자를 사용하지 않으려면 조직의 보안 정책에 따라 키 페어 인증에 사용되는 공개 키와 개인 키 파일을 보호할 책임이 있습니다.
시크릿 관리자가 구성되면 인증 방법을 결정합니다. AWS 에서는 다른 시크릿을 유지할 필요가 없으므로 Openflow와 연결된 EC2 인스턴스 역할을 사용하는 것이 좋습니다.
Openflow에서 오른쪽 상단의 햄버거 메뉴에서 이 시크릿 관리자와 연결된 매개 변수 공급자를 구성합니다. Controller Settings » Parameter Provider 로 이동한 다음 매개 변수 값을 가져옵니다.
이 시점에서 모든 자격 증명은 연결된 매개 변수 경로로 참조할 수 있으며 민감한 값은 Openflow 내에서 유지될 필요가 없습니다.
If any other Snowflake users require access to the ingested data and created tables (for example, for custom processing in Snowflake), then grant those users the role created in step 2.
커넥터 설정하기¶
데이터 엔지니어는 다음 작업을 수행하여 커넥터를 설치하고 구성합니다.
커넥터 설치하기¶
Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.
Openflow Connector 페이지에서 커넥터를 찾아 Add to runtime 을 선택합니다.
In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.
참고
커넥터를 설치하기 전에 커넥터가 수집한 데이터를 저장할 수 있도록 Snowflake에서 데이터베이스와 스키마를 생성했는지 확인하십시오.
Snowflake 계정 자격 증명으로 배포를 인증하고 런타임 애플리케이션이 Snowflake 계정에 액세스할 수 있도록 허용할지 묻는 메시지가 표시되면 Allow 를 선택합니다. 커넥터 설치 프로세스를 완료하는 데 몇 분 정도 걸립니다.
Snowflake 계정 자격 증명으로 런타임에 인증합니다.
커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.
커넥터 구성¶
가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Parameters 를 선택합니다.
Populate the required parameter values as described in Parameters section below.
Parameters¶
이 섹션에서는 JSON 데이터 형식을 위한 Openflow Connector for Kinesis 의 모든 매개 변수에 대해 설명합니다.
커넥터는 여러 모듈로 구성됩니다. 세트를 보려면 커넥터 프로세스 그룹을 두 번 클릭합니다. 모듈의 매개 변수 컨텍스트에서 각 모듈의 매개 변수를 설정할 수 있습니다.
Snowflake destination parameters¶
매개 변수 |
설명 |
필수 |
|---|---|---|
대상 데이터베이스 |
데이터가 유지될 데이터베이스입니다. Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다. |
예 |
대상 스키마 |
데이터가 유지될 스키마로, Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다. 다음 예제를 참조하세요.
|
예 |
Iceberg 활성화됨 |
테이블 작업에 Iceberg를 활성화할지 여부입니다. |
예 |
Schema Evolution Enabled |
커넥터 수준에서 스키마 진화를 활성화하거나 비활성화합니다. 활성화하면 테이블에 대한 자동 스키마 변경이 허용됩니다. 스키마 진화는 테이블별 매개 변수를 통해 개별 테이블 수준에서 제어할 수도 있습니다. |
예 |
Schema Evolution For New Tables Enabled |
새 테이블을 생성할 때 스키마 진화를 활성화할지 여부를 제어합니다. ‘true’로 설정하면 ENABLE_SCHEMA_EVOLUTION = TRUE 매개 변수를 사용하여 새 테이블이 생성됩니다. ‘false’로 설정하면 ENABLE_SCHEMA_EVOLUTION = FALSE 매개 변수를 사용하여 새 테이블이 생성됩니다. Iceberg 테이블은 자동으로 생성되지 않으므로 해당 사항이 없습니다. 이 설정은 기존 테이블이 아닌 테이블 생성에만 영향을 줍니다. |
예 |
Snowflake 계정 식별자 |
사용하는 경우:
|
예 |
Snowflake Authentication Strategy |
사용하는 경우:
|
예 |
Snowflake 개인 키 |
사용하는 경우:
|
아니요 |
Snowflake 개인 키 파일 |
사용하는 경우:
|
아니요 |
Snowflake 개인 키 비밀번호 |
사용하는 경우
|
아니요 |
Snowflake 역할 |
사용하는 경우
|
예 |
Snowflake 사용자 이름 |
사용하는 경우
|
예 |
Kinesis JSON Source Parameters¶
매개 변수 |
설명 |
필수 |
|---|---|---|
AWS 리전 코드 |
Kinesis Stream이 위치한 AWS 리전(예: |
예 |
AWS 액세스 키 ID |
The AWS Access Key ID to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
예 |
AWS 시크릿 액세스 키 |
The AWS Secret Access Key to connect to your Kinesis Stream, DynamoDB, and, optionally, CloudWatch. |
예 |
Kinesis 애플리케이션 이름 |
Kinesis Stream 소비에 대한 애플리케이션의 진행 상황을 추적하기 위한 DynamoDB 테이블 이름에 사용되는 이름입니다. |
예 |
Kinesis 초기 스트림 위치 |
데이터가 복제를 시작하는 초기 스트림 위치입니다.
|
예 |
Kinesis Stream 이름 |
AWS 데이터를 소비할 Kinesis Stream 이름입니다. |
예 |
메트릭 게시 |
Kinesis Client Library 메트릭이 게시되는 위치를 지정합니다. 가능한 값은 |
예 |
플로우 실행¶
평면을 마우스 오른쪽 버튼으로 클릭하고 Enable all Controller Services 를 선택합니다.
Right-click on the connector’s process group and select Start.
커넥터가 데이터 수집을 시작합니다.
Table Schema¶
The Snowflake table loaded by the connector contains columns named by the keys of your Kinesis messages.
The connector also adds a KINESISMETADATA column which stores metadata about the record.
다음은 커넥터가 로드하는 Snowflake 테이블의 예제입니다.
행 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
KINESISMETADATA |
|---|---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
{ … KINESISMETADATA object … } |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
{ … KINESISMETADATA object … } |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
{ … KINESISMETADATA object … } |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
{ … KINESISMETADATA object … } |
KINESISMETADATA 열에는 다음 필드가 있는 오브젝트가 포함되어 있습니다.
필드 이름 |
필드 유형 |
Example Value |
설명 |
|---|---|---|---|
|
String |
|
레코드의 출처인 Kinesis 스트림의 이름입니다. |
|
String |
|
레코드의 출처인 스트림에 있는 shard의 식별자입니다. |
|
String |
|
레코드가 스트림에 삽입된 대략적인 시간(ISO 8601 형식)입니다. |
|
String |
|
레코드에 대해 데이터 생산자가 지정한 파티션 키입니다. |
|
String |
|
Kinesis Data Streams가 shard의 레코드에 할당한 고유한 시퀀스 번호입니다. |
|
숫자 |
|
레코드의 하위 시퀀스 번호입니다(동일한 시퀀스 번호를 가진 집계된 레코드에 사용됨). |
|
String |
|
레코드의 시퀀스 번호와 하위 시퀀스 번호의 조합입니다. |
스키마 진화¶
이 커넥터는 자동 스키마 감지 및 진화를 지원합니다. Snowflake의 테이블 구조는 커넥터가 로딩하는 새로운 데이터의 구조를 지원하기 위해 자동으로 정의되고 진화합니다.
Snowflake는 수신 데이터의 스키마를 감지하고 사용자 정의 스키마와 일치하는 테이블에 데이터를 로드합니다. Snowflake를 통해 새 열을 추가하거나 새로 수신되는 레코드에 누락된 열에서 NOT NULL 제약 조건을 삭제할 수도 있습니다.
커넥터를 사용한 스키마 감지는 제공된 JSON 데이터를 기반으로 데이터 타입을 추론합니다.
커넥터가 대상 테이블을 생성하면 스키마 진화는 기본적으로 활성화됩니다.
If you want to enable or disable schema evolution on an existing table,
use the ALTER TABLE command to set the ENABLE_SCHEMA_EVOLUTION parameter.
You must also use a role that has the OWNERSHIP privilege on the table. For more information, see 테이블 스키마 진화.
However, if schema evolution is disabled for an existing table, then the connector will try to send the rows with mismatched schemas to the configured failure output port.
Iceberg table support¶
Openflow Connector for Kinesis can ingest data into a Snowflake-managed Apache Iceberg™ table when Iceberg Enabled is set to true.
요구 사항 및 제한 사항¶
Before you configure the Openflow Connector for Kinesis for Iceberg table ingestion, note the following requirements and limitations:
커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다.
사용자가 생성된 테이블에 데이터를 삽입할 수 있는 액세스 권한이 있는지 확인합니다.
구성 및 설정¶
To configure the Openflow Connector for Kinesis for Iceberg table ingestion, follow the steps in Set up Openflow Connector for Kinesis for JSON data format with a few differences noted in the following sections.
Iceberg 테이블로 수집 활성화¶
Iceberg 테이블로 수집을 사용하려면 Iceberg Enabled 매개 변수를 true 로 설정해야 합니다.
수집을 위한 Iceberg 테이블 만들기¶
커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다. 초기 테이블 스키마는 커넥터 Schema Evolution Enabled 속성 설정에 따라 달라집니다.
스키마 진화가 활성화된 상태에서 이름이 kinesisMetadata``인 열이 있는 테이블을 생성해야 합니다. 커넥터는 메시지 필드에 대한 열을 자동으로 생성하고 ``kinesisMetadata 열 스키마를 변경합니다.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT()
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table'
ENABLE_SCHEMA_EVOLUTION = true;
If schema evolution is disabled, you must create the table with all fields the Kinesis message contains. When you create an Iceberg table, you can use Iceberg data types or compatible Snowflake types. The semi-structured VARIANT type isn’t supported. Instead, use a structured OBJECT or MAP.
예를 들어, 다음 메시지를 생각해 보십시오.
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
다음 문은 Kinesis 메시지에 포함된 모든 필드가 있는 테이블을 생성합니다.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
kinesisMetadata OBJECT(
stream STRING,
shardId STRING,
approximateArrival STRING,
partitionKey STRING,
sequenceNumber STRING,
subSequenceNumber INTEGER,
shardedSequenceNumber STRING
),
id INT,
body_temperature FLOAT,
name STRING,
approved_coffee_types ARRAY(STRING),
animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN),
date_added DATE
)
EXTERNAL_VOLUME = 'my_volume'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'my_location/my_iceberg_table';
참고
kinesisMetadata must always be created. Field names inside nested structures such as dogs or cats are case sensitive.