Openflow Connector for MySQL 설정

참고

커넥터에는 커넥터 약관 이 적용됩니다.

이 항목에서는 Openflow Connector for MySQL 을 설정하는 단계에 대해 설명합니다.

전제 조건

  1. Openflow Connector for MySQL 정보 을 검토했는지 확인합니다.

  2. Snowflake와 데이터를 동기화하려면 MySQL 8 이상 버전이 있어야 합니다.

  3. Openflow를 설정 했는지 확인합니다.

  4. 데이터베이스 관리자는 다음 작업을 수행합니다.

    1. 이진 로그 를 활성화한 다음 저장하고 다음과 같이 형식을 구성합니다.

      log_bin

      on 으로 설정합니다.

      이를 통해 구조 및 데이터 변경 사항을 기록하는 이진 로그를 활성화할 수 있습니다.

      binlog_format

      row 로 설정합니다.

      커넥터는 행 기반 복제만 지원합니다. MySQL 8.x 버전이 이 설정을 지원하는 마지막 버전일 수 있으며, 향후 버전에서는 행 기반 복제만 지원하게 됩니다.

      올바른 값으로 수정된 GCP Cloud SQL 에서는 적용되지 않습니다.

      binlog_row_metadata

      full 로 설정합니다.

      커넥터가 작동하려면 모든 행 메타데이터, 가장 중요한 열 이름과 기본 키 정보가 필요합니다.

      binlog_row_image

      full 로 설정합니다.

      커넥터는 모든 열을 이진 로그에 기록할 것을 요구합니다.

      올바른 값으로 수정된 Amazon Aurora에서는 적용되지 않습니다.

      binlog_row_value_options

      비워 둡니다.

      이 옵션은 JSON 열에 영향을 주며, UPDATE 문에 대해 JSON 문서의 수정된 부분만 포함하도록 설정할 수 있습니다. 커넥터는 전체 문서를 이진 로그에 기록할 것을 요구합니다.

      binlog_expire_logs_seconds

      데이터베이스 에이전트가 장시간 일시 중지 또는 다운타임 후에도 증분 복제를 계속할 수 있도록 최소 몇 시간 이상으로 설정하십시오. 커넥터의 안정적인 작동을 위해 binary log expiration period(binlog_expire_logs_seconds) 을 최소 몇 시간으로 설정할 것을 권장합니다. 이진 로그 만료 기간이 끝나면 이진 로그 파일이 자동으로 제거될 수 있습니다. 예를 들어, 유지 관리 작업 등으로 인해 통합이 장기간 일시 중지되고 이 기간 동안 만료된 이진 로그 파일이 삭제되면 Openflow는 이러한 파일에서 데이터를 복제할 수 없습니다.

      예약 복제를 사용하는 경우 이 값은 구성된 일정보다 길어야 합니다.

      다음 코드를 예시로 참조하십시오.

      log_bin = on
      binlog_format = row
      binlog_row_metadata = full
      binlog_row_image = full
      binlog_row_value_options =
      
      Copy
    2. sort_buffer_size 의 값을 늘립니다.

      sort_buffer_size = 4194304
      
      Copy

      sort_buffer_size 는 ORDER BY 같은 인메모리 정렬 작업을 위해 쿼리 스레드당 할당되는 메모리 양(바이트 단위)을 정의합니다. 값이 너무 작으면 커넥터가 다음과 같은 오류 메시지와 함께 실패할 수 있습니다.

      Out of sort memory, consider increasing server sort buffer size. 이는 sort_buffer_size 를 늘려야 함을 나타냅니다.

    3. Amazon RDS 데이터베이스를 사용 중인 경우 rds_set_configuration 을 사용하여 binlog_expire_logs_seconds 와 관련된 보존 기간을 늘립니다. 예를 들어, 24시간 동안 binlog를 저장하려면 mysql.rds_set_configuration('binlog retention hours', 24) 를 호출합니다.

    4. SSL 을 통해 연결합니다. MySQL 에 SSL 연결을 사용할 계획이라면 데이터베이스 서버에 대한 루트 인증서를 준비하십시오. 이는 구성 중에 필수입니다.

    5. 커넥터에 대한 사용자를 생성합니다. 커넥터가 이진 로그를 읽기 위해서는 REPLICATION_SLAVE 및 REPLICATION_CLIENT 권한이 있는 사용자가 필요합니다. 다음 권한 부여:

      GRANT REPLICATION SLAVE ON *.* TO '<username>'@'%'
      GRANT REPLICATION CLIENT ON *.* TO '<username>'@'%'
      
      Copy
    6. 복제된 모든 테이블에 SELECT 권한을 부여합니다.

      GRANT SELECT ON <schema>.* TO '<username>'@'%'
      GRANT SELECT ON <schema>.<table> TO '<username>'@'%'
      
      Copy

      복제 보안에 대한 자세한 내용은 이진 로그 를 참조하십시오.

  5. Snowflake 계정 관리자는 다음 작업을 수행합니다.

    1. 유형이 SERVICE 인 Snowflake 사용자를 생성합니다. 복제된 데이터를 저장할 데이터베이스를 생성하고 USAGE 및 CREATE SCHEMA 권한 을 부여하여 해당 데이터베이스에 오브젝트를 생성할 수 있도록 Snowflake 사용자에게 권한을 설정합니다.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
           WITH
               WAREHOUSE_SIZE = 'MEDIUM'
               AUTO_SUSPEND = 300
               AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 한 쌍의 보안 키(공개 및 비공개)를 생성합니다. 사용자의 개인 키를 파일에 저장하여 커넥터의 구성에 제공합니다. Snowflake 서비스 사용자에게 공개 키를 할당합니다.

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      자세한 내용은 키 페어 섹션을 참조하십시오.

    3. 커넥터가 사용할 웨어하우스를 지정합니다. MEDIUM 데이터 웨어하우스 크기로 시작한 다음 복제되는 테이블의 수와 전송되는 데이터의 양에 따라 크기를 실험해 보십시오. 일반적으로 테이블 수가 많을수록 웨어하우스 크기보다는 멀티 클러스터 웨어하우스 에서 확장이 더 쉽습니다.

커넥터 설정하기

데이터 엔지니어는 다음 작업을 수행하여 커넥터를 설치하고 구성합니다.

커넥터 설치하기

  1. Openflow 개요 페이지로 이동합니다. Featured connectors 섹션에서 View more connectors 을 선택합니다.

  2. Openflow Connector 페이지에서 커넥터를 찾아 Add to runtime 을 선택합니다.

  3. Select runtime 대화 상자의 Available runtimes 드롭다운 목록에서 런타임을 선택합니다.

  4. Add 를 선택합니다.

    참고

    커넥터를 설치하기 전에 커넥터가 수집한 데이터를 저장할 수 있도록 Snowflake에서 데이터베이스와 스키마를 생성했는지 확인하십시오.

  5. Snowflake 계정 자격 증명으로 배포를 인증하고 런타임 애플리케이션이 Snowflake 계정에 액세스할 수 있도록 허용할지 묻는 메시지가 표시되면 Allow 를 선택합니다. 커넥터 설치 프로세스를 완료하는 데 몇 분 정도 걸립니다.

  6. Snowflake 계정 자격 증명으로 런타임에 인증합니다.

커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.

커넥터 구성

다음 사용 사례에 맞게 커넥터를 구성할 수 있습니다.

실시간으로 테이블 세트 복제하기

  1. 가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Parameters 를 선택합니다.

  2. 플로우 매개 변수 에 설명된 대로 필수 매개 변수 값을 채웁니다.

플로우 매개 변수

MySQL 소스 매개 변수 컨텍스트의 매개 변수를 설정한 다음 MySQL 대상 매개 변수 컨텍스트의 매개 변수를 설정합니다. 이 작업이 완료되면 커넥터를 활성화할 수 있습니다. 커넥터는 MySQL 및 Snowflake 모두에 연결하고 실행을 시작해야 합니다. 그러나 커넥터는 복제할 테이블이 구성에 명시적으로 추가될 때까지 데이터를 복제하지 않습니다.

복제를 위해 특정 테이블을 구성하려면 MySQL 수집 매개 변수 컨텍스트를 편집합니다. 복제 매개 변수 컨텍스트에 변경 사항을 적용하면 커넥터에 의해 구성이 선택되고 모든 테이블에 대해 복제 수명 주기가 시작됩니다.

MySQL 소스 매개 변수 컨텍스트

매개 변수

설명

MySQL 연결 URL

소스 데이터베이스에 대한 전체 JDBC URL. 커넥터는 MySQL 과 호환되는 MariaDB 드라이버를 사용하며 URL 에 jdbc:mariadb 접두사가 필요합니다. SSL 이 비활성화되어 있으면 URL 연결에 allowPublicKeyRetrieval 매개 변수가 true 로 설정되어 있어야 합니다.

예제:

  • SSL 사용 설정: jdbc:mariadb://example.com:3306

  • SSL 사용 안 함 설정: jdbc:mariadb://example.com:3306?allowPublicKeyRetrieval=true

MySQL JDBC 드라이버

MariaDB JDBC 드라이버 jar 의 절대 경로. 커넥터는 MySQL 과 호환되는 MariaDB 드라이버를 사용합니다. MariaDB JDBC 드라이버를 업로드하려면 Reference asset 확인란을 선택합니다.

예: /opt/resources/drivers/mariadb-java-client-3.5.2.jar

MySQL 사용자 이름

커넥터의 사용자 이름입니다.

MySQL 비밀번호

커넥터의 비밀번호입니다.

MySQL 대상 매개 변수 컨텍스트

매개 변수

설명

대상 데이터베이스

데이터가 유지될 데이터베이스입니다. Snowflake에 이미 존재해야 합니다.

Snowflake 계정 식별자

Snowflake 계정 이름은 [organization-name]-[account-name] 형식으로 형식이 지정되며, 데이터는 여기에 영구적으로 저장됩니다.

Snowflake Authentication Strategy

Snowflake에 대한 인증 전략. 가능한 값: SPCS 에서 흐름을 실행하는 경우 SNOWFLAKE_SESSION_TOKEN, 개인 키를 사용하여 액세스를 설정하는 경우 KEY_PAIR.

Snowflake 개인 키

인증에서 사용되는 RSA 개인 키입니다. RSA 키는 PKCS8 표준에 따라 형식이 지정되어야 하며 표준 PEM 헤더와 푸터가 있어야 합니다. Snowflake 개인 키 파일 또는 Snowflake 개인 키 중 하나를 정의해야 합니다.

Snowflake 개인 키 파일

PKCS8 표준에 따라 형식이 지정되고 표준 PEM 헤더와 푸터가 있는 Snowflake 인증에 사용되는 RSA 개인 키가 포함된 파일입니다. 헤더 라인은 -----BEGIN PRIVATE 로 시작합니다. Reference asset 확인란을 선택하여 개인 키 파일을 업로드합니다.

Snowflake 개인 키 비밀번호

Snowflake 개인 키 파일과 연결된 비밀번호입니다

Snowflake 역할

쿼리 실행 중에 사용되는 Snowflake 역할

Snowflake 사용자 이름

Snowflake 인스턴스에 연결하는 데 사용되는 사용자 이름입니다

Snowflake 웨어하우스

쿼리 실행에 사용되는 Snowflake 웨어하우스

MySQL 수집 매개 변수 컨텍스트

매개 변수

설명

포함된 테이블 이름

스키마를 포함한 테이블 경로의 쉼표로 구분된 목록입니다. 예: public.my_table, other_schema.other_table

포함된 테이블 Regex

테이블 경로와 일치시킬 정규식입니다. 식과 일치하는 모든 경로가 복제되며, 나중에 생성되는 패턴과 일치하는 새 테이블도 자동으로 포함됩니다. 예: public\.auto_.*

필터 JSON

정규화된 테이블 이름 목록과 복제에 포함해야 하는 열 이름에 대한 정규식 패턴이 포함된 JSON 입니다. 예: [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ]public 스키마의 table1 에서 name 으로 끝나는 모든 열을 포함합니다.

작업 일정 CRON 병합하기

CRON 식은 저널에서 대상 테이블로의 병합 작업이 트리거되는 기간을 정의하는 식입니다. 연속 병합을 원하거나 웨어하우스 실행 시간을 제한하는 시간 예약을 하려면 * * * * * ? 로 설정하십시오. 예를 들어, * 0 * * * ? 문자열은 전체 시간에 1분 동안 병합을 예약하려는 것을 나타냅니다. * 20 14 ? * MON-FRI 문자열은 매주 월요일부터 금요일까지 오후 2:20에 병합을 예약하려는 것을 나타냅니다. 자세한 정보와 예제는 CronTrigger 자습서 를 참조하십시오.

복제에 테이블을 제거했다가 다시 추가하기

복제에서 테이블을 제거하려면 복제 매개 변수 컨텍스트의 포함된 테이블 이름 또는 포함된 테이블 정규식 매개 변수에서 테이블이 제거되었는지 확인합니다.

나중에 복제에 테이블을 다시 추가하려면 먼저 Snowflake에서 해당 대상 테이블을 삭제하십시오. 그런 다음 테이블을 포함된 테이블 이름 또는 포함된 테이블 정규식 매개 변수에 다시 추가합니다. 이렇게 하면 테이블에 대한 복제 프로세스가 새로 시작됩니다.

이 접근법은 실패한 테이블 복제 시나리오에서 복구하는 데에도 사용할 수 있습니다.

테이블에서 열의 하위 집합 복제하기

커넥터는 테이블별로 복제된 데이터를 구성된 열의 하위 집합으로 필터링할 수 있습니다.

열에 필터를 적용하려면 복제 매개 변수 컨텍스트에서 열 필터 속성을 수정하여 필터를 적용하려는 모든 테이블에 대해 1개의 항목씩 구성 배열을 추가합니다.

열은 이름이나 패턴에 따라 포함하거나 제외할 수 있습니다. 테이블당 단일 조건을 적용하거나 여러 조건을 결합할 수 있으며, 제외가 항상 포함보다 우선합니다.

다음 예제는 사용 가능한 필드를 보여줍니다. schematable 필드는 필수 입력 사항입니다. included, excluded, includedPattern, excludedPattern 중 하나 이상이 필요합니다.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

테이블의 데이터 변경 사항 추적

커넥터는 소스 테이블의 데이터의 현재 상태뿐만 아니라 모든 변경 집합의 모든 행의 모든 상태도 복제합니다. 이 데이터는 대상 테이블과 동일한 스키마로 생성된 저널 테이블에 저장됩니다.

저널 테이블 이름은 다음과 같은 형식입니다. <소스 테이블 이름>_JOURNAL_<타임스탬프>_<스키마 생성>

여기서 <타임스탬프> 는 소스 테이블이 복제에 추가된 epoch 초의 값이고 <스키마 생성> 은 소스 테이블의 스키마가 변경될 때마다 증가하는 정수입니다. 즉, 스키마가 변경되는 소스 테이블에는 여러 개의 저널 테이블이 생깁니다.

복제에서 테이블을 제거했다가 다시 추가하면 <타임스탬프> 값이 변경되고 <스키마 생성>1 에서 다시 시작됩니다.

중요

어떤 방식으로든 저널 테이블이나 그 안의 데이터를 변경하지 않는 것이 좋습니다. 커넥터가 복제 프로세스의 일부로 대상 테이블을 업데이트하는 데 사용됩니다.

커넥터는 저널 테이블을 삭제하지 않고 복제된 모든 소스 테이블에 대해 최신 저널만 적극적으로 사용합니다. 저장소를 다시 확보하려면 복제에서 제거된 소스 테이블과 관련된 저널 테이블과 활성 복제된 테이블의 최신 세대를 제외한 모든 테이블을 안전하게 삭제할 수 있습니다.

예를 들어, 커넥터가 소스 테이블 orders 를 적극적으로 복제하도록 설정되어 있고 이전에 테이블 customers 를 복제에서 제거한 경우 다음과 같은 저널 테이블이 있을 수 있습니다. 이 경우 orders_5678_2제외하고 모두 삭제할 수 있습니다.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

병합 작업 예약 구성하기

커넥터는 데이터 웨어하우스를 사용하여 변경 데이터 캡처(CDC) 데이터를 대상 테이블에 병합합니다. 이 작업은 MergeSnowflakeJournalTable 프로세서에 의해 트리거됩니다. 새로운 변경 사항이 없거나 MergeSnowflakeJournalTable 큐에 대기 중인 새 플로우 파일이 없는 경우 병합이 트리거되지 않고 웨어하우스가 자동 일시 중단됩니다.

웨어하우스 비용을 제한하고 예약된 시간에만 병합을 제한하려면 병합 작업 일정 CRON 매개 변수에서 CRON 식을 사용하십시오. MergeSnowflakeJournalTable 프로세서로 수신 플로우 파일을 스로틀링하고 병합은 전용 기간에만 트리거됩니다. 예약에 대한 자세한 내용은 예약 전략 섹션을 참조하십시오.

플로우 실행

  1. 평면을 마우스 오른쪽 버튼으로 클릭하고 Enable all Controller Services 를 선택합니다.

  2. 가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Start 를 선택합니다. 커넥터가 데이터 수집을 시작합니다.