Openflow Connector for Kinesis 설정¶
참고
커넥터에는 커넥터 약관 이 적용됩니다.
이 항목에서는 Openflow Connector for Kinesis 를 설정하는 단계에 대해 설명합니다.
전제 조건¶
Openflow Connector for Kinesis 정보 을 검토했는지 확인합니다.
Openflow를 설정 했는지 확인합니다.
Kinesis Stream 설정하기¶
AWS 계정 관리자는 AWS 계정에서 다음 작업을 수행합니다.
Kinesis Streams 및 DynamoDB 에 액세스할 수 있는 IAM 권한이 있는 AWS 계정 이 있는지 확인하십시오.
선택 사항으로 배달 못한 편지 큐(DLQ) Kinesis Stream을 생성합니다. 성공적으로 구문 분석할 수 없는 메시지는 지정된 DLQ 로 리디렉션될 수 있습니다.
Snowflake 계정 설정하기¶
Snowflake 계정 관리자는 다음 작업을 수행합니다.
새 역할을 생성하거나 기존 역할을 사용하여 데이터베이스 권한 권한을 부여합니다.
데이터를 저장할 대상 데이터베이스와 대상 테이블을 생성하는 데 사용할 대상 스키마를 생성합니다.
대상 테이블이 없는 경우 커넥터의 기능을 사용하여 대상 테이블을 자동으로 생성할 계획이라면 사용자에게 Snowflake 오브젝트를 생성하고 관리하는 데 필요한 권한이 있는지 확인하십시오.
오브젝트
권한
참고
데이터베이스
USAGE
스키마
USAGE . CREATE TABLE .
스키마 수준 오브젝트가 생성된 후 CREATE
object
권한이 취소될 수 있습니다.테이블
OWNERSHIP
Kinesis 커넥터를 사용하여 기존 테이블로 데이터를 수집할 때만 필수입니다. . 커넥터가 Kinesis 스트림의 레코드에 대한 새 대상 테이블을 생성하는 경우 구성에 지정된 사용자의 기본 역할이 테이블 소유자가 됩니다.
다음 스크립트를 사용하여 사용자 지정 역할을 생성하고 구성할 수 있습니다(SECURITYADMIN 또는 이와 동등한 요구 사항 필요):
USE ROLE SECURITYADMIN; CREATE ROLE kinesis_connector_role_1; GRANT USAGE ON DATABASE kinesis_db TO ROLE kinesis_connector_role_1; GRANT USAGE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; GRANT CREATE TABLE ON SCHEMA kinesis_schema TO ROLE kinesis_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kinesis_connector_role_1;
유형이 SERVICE 인 새 Snowflake 서비스 사용자를 생성합니다.
Snowflake 서비스 사용자에게 이전 단계에서 생성한 역할을 부여합니다.
GRANT ROLE kinesis_connector_role_1 TO USER kinesis_connector_user_1; ALTER USER kinesis_connector_user_1 SET DEFAULT_ROLE = kinesis_connector_role_1;
3단계의 Snowflake SERVICE 사용자에 대해 키 페어 인증 으로 구성합니다.
Snowflake는 이 단계를 강력히 권장합니다. Openflow에서 지원하는 시크릿 관리자(예: AWS, Azure, Hashicorp)를 구성하고 공개 및 개인 키를 시크릿 스토어에 저장합니다.
참고
어떤 이유로든 시크릿 관리자를 사용하지 않으려면 조직의 보안 정책에 따라 키 페어 인증에 사용되는 공개 키와 개인 키 파일을 보호할 책임이 있습니다.
시크릿 관리자가 구성되면 인증 방법을 결정합니다. AWS 에서는 다른 시크릿을 유지할 필요가 없으므로 Openflow와 연결된 EC2 인스턴스 역할을 사용하는 것이 좋습니다.
Openflow에서 오른쪽 상단의 햄버거 메뉴에서 이 시크릿 관리자와 연결된 매개 변수 공급자를 구성합니다. Controller Settings » Parameter Provider 로 이동한 다음 매개 변수 값을 가져옵니다.
이 시점에서 모든 자격 증명은 연결된 매개 변수 경로로 참조할 수 있으며 민감한 값은 Openflow 내에서 유지될 필요가 없습니다.
다른 Snowflake 사용자가 커넥터에서 수집한 원시 수집 문서 및 테이블에 대한 액세스가 필요한 경우(예: Snowflake에서 사용자 정의 처리를 위해), 해당 사용자에게 1단계에서 생성한 역할을 부여하십시오.
커넥터가 사용할 웨어하우스를 지정합니다. 가장 작은 데이터 웨어하우스 크기로 시작한 다음 복제되는 테이블 수와 전송되는 데이터의 양에 따라 크기를 실험해 보십시오. 테이블 수가 많은 경우 일반적으로 더 큰 규모의 웨어하우스보다는 멀티 클러스터 웨어하우스 에서 확장이 더 쉽습니다.
커넥터 설정하기¶
데이터 엔지니어는 다음 작업을 수행하여 커넥터를 설치하고 구성합니다.
커넥터 설치하기¶
Openflow 개요 페이지로 이동합니다. Featured connectors 섹션에서 View more connectors 을 선택합니다.
Openflow Connector 페이지에서 커넥터를 찾아 Add to runtime 을 선택합니다.
Select runtime 대화 상자의 Available runtimes 드롭다운 목록에서 런타임을 선택합니다.
Add 를 선택합니다.
참고
커넥터를 설치하기 전에 커넥터가 수집한 데이터를 저장할 수 있도록 Snowflake에서 데이터베이스와 스키마를 생성했는지 확인하십시오.
Snowflake 계정 자격 증명으로 배포를 인증하고 런타임 애플리케이션이 Snowflake 계정에 액세스할 수 있도록 허용할지 묻는 메시지가 표시되면 Allow 를 선택합니다. 커넥터 설치 프로세스를 완료하는 데 몇 분 정도 걸립니다.
Snowflake 계정 자격 증명으로 런타임에 인증합니다.
커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.
커넥터 구성¶
가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Parameters 를 선택합니다.
플로우 매개 변수 에 설명된 대로 필수 매개 변수 값을 채웁니다.
플로우 매개 변수¶
이 섹션에서는 다음 매개 변수 컨텍스트에 따라 구성할 수 있는 플로우 매개 변수에 대해 설명합니다.
Kinesis 소스 매개 변수: Kinesis와의 연결을 설정하는 데 사용됩니다.
Kinesis 대상 매개 변수: Snowflake와 연결을 설정하는 데 사용됩니다.
Kinesis 수집 매개 변수: Kinesis에서 다운로드한 데이터의 구성을 정의하는 데 사용됩니다.
Kinesis 소스 매개 변수¶
매개 변수 |
설명 |
---|---|
AWS 리전 코드 |
Kinesis Stream이 위치한 AWS 리전(예: |
AWS 액세스 키 ID |
AWS 액세스 키 ID 를 사용하여 Kinesis Stream 및 DynamoDB 에 연결합니다. |
AWS 시크릿 액세스 키 |
Kinesis Stream과 DynamoDB 에 연결하기 위한AWS 시크릿 액세스 키입니다. |
Schema Registry URL |
AVRO Schema Registry의 URL . AVRO 스키마 액세스 전략 매개 변수가 |
Schema Registry 인증 유형 |
AVRO Schema Registry에서 사용하는 인증 유형입니다. AVRO 스키마 액세스 전략 매개 변수가
|
Schema Registry 사용자 이름 |
AVRO Schema Registry에 대한 |
Schema Registry 비밀번호 |
AVRO Schema Registry에 대한 |
Kinesis 대상 매개 변수¶
매개 변수 |
설명 |
---|---|
대상 데이터베이스 |
데이터가 유지될 데이터베이스입니다. Snowflake에 이미 존재해야 합니다. |
대상 스키마 |
데이터가 유지될 스키마입니다. Snowflake에 이미 존재해야 합니다. 이 매개 변수는 대/소문자를 구분합니다. |
Snowflake 계정 식별자 |
[organization-name]-[account-name]과 같은 형식의 Snowflake 계정 이름(데이터가 유지되는 곳)입니다. |
Snowflake Authentication Strategy |
Snowflake에 대한 인증 전략. 가능한 값: SPCS 에서 흐름을 실행하는 경우 |
Snowflake 개인 키 |
인증에서 사용되는 RSA 개인 키입니다. RSA 키는 PKCS8 표준에 따라 형식이 지정되어야 하며 표준 PEM 헤더와 푸터가 있어야 합니다. Snowflake 개인 키 파일 또는 Snowflake 개인 키 중 하나를 정의해야 합니다. |
Snowflake 개인 키 파일 |
PKCS8 표준에 따라 형식이 지정되고 표준 PEM 헤더와 푸터가 있는 Snowflake 인증에 사용되는 RSA 개인 키가 포함된 파일입니다. 헤더 라인은 |
Snowflake 개인 키 비밀번호 |
Snowflake 개인 키 파일과 연결된 비밀번호입니다. |
Snowflake 역할 |
쿼리 실행 중에 사용되는 Snowflake 역할입니다. |
Snowflake 사용자 이름 |
Snowflake 인스턴스에 연결하는 데 사용되는 사용자 이름입니다. |
Snowflake 웨어하우스 |
쿼리를 실행하는 데 사용되는 Snowflake 웨어하우스. 이 매개 변수는 대/소문자를 구분합니다. |
Kinesis 수집 매개 변수¶
매개 변수 |
설명 |
---|---|
Kinesis 애플리케이션 이름 |
Kinesis Stream 소비에 대한 애플리케이션의 진행 상황을 추적하기 위한 DynamoDB 테이블 이름에 사용되는 이름입니다. |
Kinesis Stream 이름 |
AWS 데이터를 소비할 Kinesis Stream 이름입니다. |
Kinesis 초기 스트림 위치 |
데이터가 복제를 시작하는 초기 스트림 위치입니다.
|
Kinesis DLQ 스트림 이름 |
처리에 실패한 모든 레코드가 전송되는 스트림 이름입니다. 이 매개 변수를 추가하지 않으면 Openflow 캔버스에서 커넥터의 DLQ 관련 부분에 경고 표시가 나타날 수 있습니다. |
메시지 형식 |
Kinesis의 메시지 형식입니다.
|
AVRO Schema Access Strategy |
AVRO 메시지 형식의 데이터에 액세스하려면 스키마가 필요합니다. 이 매개 변수는 특정 메시지의 AVRO 스키마에 액세스하는 전략을 정의합니다. 메시지 형식 매개 변수가
|
Kinesis Stream 대상 테이블 맵 |
이 선택적 매개 변수를 사용하면 사용자가 어떤 스트림을 어떤 테이블에 매핑할지 지정할 수 있습니다. 각 스트림과 해당 테이블 이름은 콜론으로 구분해야 합니다. 이 테이블 이름은 따옴표로 묶이지 않은 유효한 Snowflake 식별자여야 합니다. 정규식은 모호할 수 없으며 일치하는 스트림은 1개의 대상 테이블과만 일치해야 합니다. 비어 있거나 일치하는 항목이 없는 경우 스트림 이름이 테이블 이름으로 사용됩니다.
|
Iceberg 활성화됨 |
프로세서가 데이터를 Iceberg 테이블로 수집할지 여부를 지정합니다. 이 속성이 실제 테이블 유형과 일치하지 않으면 프로세서가 실패합니다.
|
플로우 실행¶
평면을 마우스 오른쪽 버튼으로 클릭하고 Enable all Controller Services 를 선택합니다.
가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Start 를 선택합니다.
커넥터가 데이터 수집을 시작합니다.
스키마¶
커넥터에 의해 로딩된 Snowflake 테이블에는 Kinesis 메시지의 키로 이름이 지정된 열이 포함되어 있습니다. 아래는 이러한 테이블의 예입니다.
행 |
ACCOUNT |
SYMBOL |
SIDE |
QUANTITY |
---|---|---|---|---|
1 |
ABC123 |
ZTEST |
BUY |
3572 |
2 |
XYZ789 |
ZABZX |
SELL |
3024 |
3 |
XYZ789 |
ZTEST |
SELL |
799 |
4 |
ABC123 |
ZABZX |
BUY |
2033 |
5 |
ABC123 |
ZTEST |
BUY |
1558 |
스키마 진화¶
현재 Iceberg Enabled
가 false
로 설정되어 있습니다. 커넥터가 대상 테이블을 생성하면 스키마 진화는 기본적으로 활성화됩니다. 기존 테이블에서 스키마 진화를 활성화 또는 비활성화하려면 ALTER TABLE 명령을 사용하여 ENABLE_SCHEMA_EVOLUTION
매개 변수를 설정합니다. 또한 테이블에 대한 OWNERSHIP
권한이 있는 역할을 사용해야 합니다. 자세한 내용은 테이블 스키마 진화 섹션을 참조하십시오.
그러나 기존 테이블에 대해 스키마 진화가 비활성화되어 있으면 커넥터는 스키마가 일치하지 않는 행을 구성된 데드 레터 큐(DLQ)로 보내려고 시도합니다.
Iceberg Enabled
가 true
로 설정된 경우 Apache Iceberg™ 테이블에 대한 스키마 진화 단락을 참조하십시오.
Apache Iceberg™ 테이블과 함께 Openflow Connector for Kinesis 사용하기¶
Openflow Connector for Kinesis 는 데이터를 Snowflake가 관리하는 Apache Iceberg™ 테이블 로 수집할 수 있습니다.
요구 사항 및 제한 사항¶
Iceberg 테이블 수집을 위한 커넥터를 구성하기 전에 다음 요구 사항과 제한 사항에 유의하십시오.
커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다.
사용자가 생성된 테이블에 데이터를 삽입할 수 있는 액세스 권한이 있는지 확인합니다.
Iceberg 테이블에는 스키마 진화가 지원되지 않습니다.
구성 및 설정¶
Iceberg 테이블 수집을 위한 커넥터를 구성하려면 다음 섹션에 설명된 몇 가지 차이점을 제외하고 커넥터 구성 의 지침을 따르십시오.
Iceberg 테이블로 수집 활성화¶
Iceberg 테이블로 수집을 사용하려면 Iceberg Enabled
매개 변수를 true
로 설정해야 합니다.
수집을 위한 Iceberg 테이블 만들기¶
커넥터를 실행하기 전에 Iceberg 테이블을 생성해야 합니다. 스키마 진화는 지원되지 않으므로 Kinesis 메시지에 포함된 모든 필드를 사용하여 테이블을 생성해야 합니다.
Iceberg 테이블을 생성할 때 Iceberg 데이터 타입 또는 호환되는 Snowflake 타입 을 사용할 수 있습니다. 반정형 VARIANT 유형은 지원되지 않습니다. 대신, 정형 OBJECT 또는 MAP 을 사용하십시오.
예를 들어, 다음 메시지를 생각해 보십시오.
{
"id": 1,
"name": "Steve",
"body_temperature": 36.6,
"approved_coffee_types": ["Espresso", "Doppio", "Ristretto", "Lungo"],
"animals_possessed":
{
"dogs": true,
"cats": false
},
"date_added": "2024-10-15"
}
예제 메시지에 대한 Iceberg 테이블을 생성하려면 다음 문을 사용합니다.
CREATE OR REPLACE ICEBERG TABLE my_iceberg_table ( id INT, body_temperature FLOAT, name STRING, approved_coffee_types ARRAY(STRING), animals_possessed OBJECT(dogs BOOLEAN, cats BOOLEAN), date_added DATE ) EXTERNAL_VOLUME = 'my_volume' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'my_location/my_iceberg_table';
참고
dogs
또는 cats
같이 중첩된 구조 내의 필드 이름은 대소문자를 구분합니다.
Apache Iceberg™ 테이블에 대한 스키마 진화¶
현재 커넥터는 Apache Iceberg™ 테이블에 대한 스키마 진화를 지원하지 않습니다.
알려진 문제¶
커넥터의 프로세스 그룹에는 “업로드 실패”라는 이름의 출력 포트가 하나 있습니다. Snowflake에 성공적으로 업로드되지 않은 FlowFiles 을 처리하는 데 사용할 수 있습니다. 이 포트가 커넥터의 프로세스 그룹 외부에 연결되어 있지 않으면 무시해도 되는 경고 표시가 표시됩니다.
모든 프로세서는 중지된 경우 한 번만 실행하도록 명령할 수 있습니다. ConsumeKinesisStream 프로세서는 내부 아키텍처로 인해 한 번만 실행하도록 명령해도 의미 있는 작업을 수행하지 않습니다. 프로세서가 작동을 시작하려면 프로세서를 시작하고 약 2분 동안 실행해야 합니다.