Openflow Connector for PostgreSQL 설정

참고

커넥터에는 커넥터 약관 이 적용됩니다.

이 항목에서는 Openflow Connector for PostgreSQL 을 설정하는 단계에 대해 설명합니다.

전제 조건

  1. Openflow Connector for PostgreSQL 정보 을 검토했는지 확인합니다.

  2. 지원되는 PostgreSQL 버전 을 검토했는지 확인합니다.

  3. 권장: 런타임당 하나의 커넥터 인스턴스만 추가해야 합니다.

  4. Openflow - BYOC 설정 또는 :doc:`/user-guide/data-integration/openflow/setup-openflow-spcs`을 완료했는지 확인합니다.

  5. 데이터베이스 관리자는 다음 작업을 수행합니다.

    1. wal_level 구성하기

    2. 발행물 생성하기

    3. PostgreSQL 서버에 WAL 을 위한 충분한 디스크 공간이 있는지 확인합니다. 이는 복제 슬롯이 생성되면 커넥터가 해당 위치를 확인하고 전진할 때까지 복제 슬롯이 보유한 위치의 WAL 데이터가 PostgreSQL 에 유지되기 때문입니다.

    4. 복제가 활성화된 모든 테이블에 기본 키가 있는지 확인합니다. 키는 단일 열 또는 복합 열일 수 있습니다.

    5. 테이블의 REPLICA IDENTITYDEFAULT 로 설정합니다. 이렇게 하면 기본 키가 WAL 에 표시되고 커넥터가 이를 읽을 수 있습니다.

    6. 커넥터에 대한 사용자를 생성합니다. 커넥터는 복제된 모든 테이블에서 REPLICATION 특성과 SELECT 권한이 있는 사용자를 필요로 합니다. 커넥터의 구성에 입력할 비밀번호를 사용하여 해당 사용자를 생성합니다. 복제 보안에 대한 자세한 내용은 보안 섹션을 참조하십시오.

  6. Snowflake 계정 관리자는 다음 작업을 수행합니다.

    1. 유형이 SERVICE 인 Snowflake 사용자를 생성합니다. 복제된 데이터를 저장할 데이터베이스를 생성하고 USAGE 및 CREATE SCHEMA 권한 을 부여하여 해당 데이터베이스에 오브젝트를 생성할 수 있도록 Snowflake 사용자에게 권한을 설정합니다.

      CREATE DATABASE <destination_database>;
      CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow';
      CREATE ROLE <openflow_role>;
      GRANT ROLE <openflow_role> TO USER <openflow_user>;
      GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
      GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
      CREATE WAREHOUSE <openflow_warehouse>
        WITH
          WAREHOUSE_SIZE = 'MEDIUM'
          AUTO_SUSPEND = 300
          AUTO_RESUME = TRUE;
      GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
      
      Copy
    2. 한 쌍의 보안 키(공개 및 비공개)를 생성합니다. 커넥터를 구성하는 동안 사용하기 위해 사용자의 개인 키를 파일에 저장합니다. 공개 키를 Snowflake 서비스 사용자에게 할당합니다.

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      자세한 내용은 키 페어 인증 섹션을 참조하세요.

    3. 커넥터가 사용할 웨어하우스를 지정합니다. MEDIUM 데이터 웨어하우스 크기로 시작한 다음 복제되는 테이블의 수와 전송되는 데이터의 양에 따라 크기를 실험해 봅니다. 일반적으로 테이블 수가 많을수록 웨어하우스 크기보다는 :doc:`멀티 클러스터 웨어하우스</user-guide/warehouses-multicluster>`로 확장이 더 쉽습니다.

wal_level 구성하기

Openflow Connector for PostgreSQL 에서는 wal_levellogical 로 설정해야 합니다.

PostgreSQL 서버가 호스팅되는 위치에 따라 다음과 같이 wal_level을 구성할 수 있습니다.

온 프레미스

ALTER SYSTEM 권한이 있는 슈퍼 사용자 또는 사용자로 다음 쿼리를 실행합니다.

ALTER SYSTEM SET wal_level = logical;
Copy

RDS

에이전트가 사용하는 사용자에게는 rds_superuser 또는 rds_replication 역할을 할당해야 합니다.

또한 다음과 같이 설정해야 합니다.

  • rds.logical_replication 정적 매개 변수를 1로 설정합니다.

  • 데이터베이스 및 복제 설정에 따라 max_replication_slots, max_connectionsmax_wal_senders 매개 변수를 설정합니다.

AWS Aurora

rds.logical_replication 정적 매개 변수를 1로 설정합니다.

GCP

다음 플래그를 설정합니다.

  • cloudsql.logical_decoding=on.

  • cloudsql.enable_pglogical=on.

자세한 내용은 Google Cloud 설명서 섹션을 참조하십시오.

Azure

복제 지원을 Logical 로 설정합니다. 자세한 내용은 Azure 설명서 섹션을 참조하십시오.

발행물 생성하기

Openflow Connector for PostgreSQL 에서는 복제를 시작하기 전에 발행물 을 만들고 PostgreSQL 에 구성해야 합니다. 전체 또는 테이블의 하위 집합은 물론 지정된 열만 있는 특정 테이블에 대해서도 생성할 수 있습니다. 복제할 계획인 모든 테이블과 열이 발행물에 포함되어 있는지 확인합니다. 커넥터가 실행되는 동안 나중에 게시를 수정할 수도 있습니다. 발행물을 생성하고 구성하려면 다음과 같이 하십시오.

  1. 데이터베이스에서 CREATE 권한이 있는 사용자로 로그인한 후 다음 쿼리를 실행합니다.

    CREATE PUBLICATION <publication name>;
    
    Copy
  2. 데이터베이스 에이전트가 사용할 수 있는 테이블을 정의합니다.

ALTER PUBLICATION <publication name> ADD TABLE <table name>;
Copy

중요

PostgreSQL 15 이상 에서는 지정된 테이블 열의 하위 집합에 대한 발행물 구성을 지원합니다. 커넥터가 이를 올바르게 지원하려면 열 필터링 설정 을 사용하여 발행물에 설정된 것과 동일한 열을 포함해야 합니다.

이 설정이 없으면 커넥터는 다음과 같이 작동합니다.

  • 대상 테이블에서 필터에 포함되지 않은 열에는 __DELETED 접미사가 추가됩니다. 스냅샷 단계 동안 복제된 모든 데이터는 유지됩니다.

  • 간행물에 새 열을 추가한 후에는 테이블이 영구적으로 실패하게 되며, 해당 복제를 다시 시작해야 합니다.

자세한 내용은 ALTER PUBLICATION 섹션을 참조하십시오.

커넥터 설치하기

  1. Openflow 개요 페이지로 이동합니다. Featured connectors 섹션에서 View more connectors 을 선택합니다.

  2. Openflow Connector 페이지에서 커넥터를 찾아 Add to runtime 을 선택합니다.

  3. Select runtime 대화 상자의 Available runtimes 드롭다운 목록에서 런타임을 선택합니다.

  4. Add 를 선택합니다.

    참고

    커넥터를 설치하기 전에 커넥터가 수집한 데이터를 저장할 수 있도록 Snowflake에서 데이터베이스와 스키마를 생성했는지 확인하십시오.

  5. Snowflake 계정 자격 증명으로 배포를 인증하고 런타임 애플리케이션이 Snowflake 계정에 액세스할 수 있도록 허용할지 묻는 메시지가 표시되면 Allow 를 선택합니다. 커넥터 설치 프로세스를 완료하는 데 몇 분 정도 걸립니다.

  6. Snowflake 계정 자격 증명으로 런타임에 인증합니다.

커넥터 프로세스 그룹이 추가된 Openflow 캔버스가 표시됩니다.

커넥터 구성

다음 사용 사례에 맞게 커넥터를 구성할 수 있습니다.

실시간으로 테이블 세트 복제하기

  1. 가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 :ui:`Parameters`를 선택합니다.

  2. 플로우 매개 변수 에 설명된 대로 필수 매개 변수 값을 채웁니다.

플로우 매개 변수

PostgreSQL 소스 매개 변수 컨텍스트의 매개 변수를 설정한 다음 PostgreSQL 대상 매개 변수 컨텍스트의 매개 변수를 설정합니다. 이 작업이 완료되면 커넥터를 활성화할 수 있으며 PostgreSQL 및 Snowflake에 모두 연결되고 실행이 시작됩니다. 그러나 테이블이 구성에 명시적으로 추가될 때까지는 데이터를 복제하지 않습니다.

복제를 위해 특정 테이블을 구성하려면 PostgreSQL 수집 매개 변수 컨텍스트를 편집합니다. 복제 매개 변수 컨텍스트에 변경 사항을 적용하면 곧바로 커넥터에서 구성을 선택하고 모든 테이블에 대해 복제 수명 주기가 시작됩니다.

PostgreSQL 소스 매개 변수 컨텍스트

매개 변수

설명

Postgres 연결 URL

소스 데이터베이스에 대한 전체 JDBC URL. 예: jdbc:postgresql://example.com:5432/public

PostgreSQL 복제본 서버에 연결하는 경우 PostgreSQL 복제본 서버에서 테이블 복제 섹션을 참조하세요.

Postgres JDBC 드라이버

PostgreSQL JDBC 드라이버 jar 의 경로. 웹사이트에서 jar을 다운로드한 다음 Reference asset 확인란을 선택하여 업로드하고 첨부합니다.

Postgres SSL 모드

SSL 연결을 활성화 또는 비활성화합니다.

Postgres 루트 SSL 인증서

데이터베이스에 대한 루트 인증서의 전체 내용입니다. SSL 이 비활성화되어 있는 경우 선택 사항입니다.

Postgres 사용자 이름

커넥터의 사용자 이름입니다.

Postgres 비밀번호

커넥터의 비밀번호입니다.

PostgreSQL 대상 매개 변수 컨텍스트

매개 변수

설명

필수

대상 데이터베이스

데이터가 유지될 데이터베이스입니다. Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다.

대상 스키마

데이터가 유지될 스키마로, Snowflake에 이미 존재해야 합니다. 이름은 대소문자를 구분합니다. 따옴표로 묶지 않은 식별자의 경우 이름을 대문자로 입력합니다.

다음 예제를 참조하세요.

  • CREATE SCHEMA SCHEMA_NAME 또는 CREATE SCHEMA schema_name: SCHEMA_NAME 사용

  • CREATE SCHEMA "schema_name" 또는 CREATE SCHEMA "SCHEMA_NAME": 각각 schema_name 또는 SCHEMA_NAME 사용

Snowflake 계정 식별자

사용하는 경우:

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: 데이터가 유지될 [organization-name]-[account-name] 형식의 Snowflake 계정 이름입니다.

Snowflake Authentication Strategy

사용하는 경우:

  • Snowflake Openflow Deployment: SNOWFLAKE_SESSION_TOKEN 을 사용합니다. 이 토큰은 Snowflake에서 자동으로 관리됩니다.

  • BYOC: KEY_PAIR를 인증 전략의 값으로 사용합니다.

Snowflake 개인 키

사용하는 경우:

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: 인증에서 사용되는 RSA 개인 키여야 합니다.

    RSA 키는 PKCS8 표준에 따라 형식이 지정되어야 하며 표준 PEM 머리글 및 바닥글이 있어야 합니다. Snowflake Private Key File 또는 Snowflake Private Key를 정의해야 합니다.

아니요

Snowflake 개인 키 파일

사용하는 경우:

  • Session token authentication strategy: 개인 키 파일은 비워 두어야 합니다.

  • KEY_PAIR: PKCS8 표준에 따라 형식이 지정되고 표준 PEM 머리글과 바닥글을 포함하며 Snowflake 인증에 사용되는 RSA 개인 키를 포함하는 파일을 업로드합니다. 머리글 라인은 ``—–BEGIN PRIVATE``으로 시작합니다. 개인 키 파일을 업로드하려면 Reference asset 확인란을 선택합니다.

아니요

Snowflake 개인 키 비밀번호

사용하는 경우

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: Snowflake 개인 키 파일과 연결된 비밀번호를 입력합니다.

아니요

Snowflake 역할

사용하는 경우

  • Session Token Authentication Strategy: 런타임 역할을 사용합니다. 런타임에 대한 :ui:`View Details`로 이동하여 Openflow UI에서 런타임 역할을 찾을 수 있습니다.

  • KEY_PAIR Authentication Strategy: 서비스 사용자에 대해 구성된 유효한 역할을 사용합니다.

Snowflake 사용자 이름

사용하는 경우

  • Session Token Authentication Strategy: 비워 두어야 합니다.

  • KEY_PAIR: Snowflake 인스턴스에 연결하는 데 사용되는 사용자 이름을 입력합니다.

Snowflake 웨어하우스

쿼리 실행에 사용되는 Snowflake 웨어하우스입니다.

PostgreSQL 수집 매개 변수 컨텍스트

매개 변수

설명

포함된 테이블 이름

스키마를 포함한 테이블 경로의 쉼표로 구분된 목록입니다(예: public.my_table, other_schema.other_table).

이름 또는 Regex로 테이블을 선택합니다. 둘 다 사용하는 경우 두 옵션 중 하나에서 일치하는 모든 테이블이 포함됩니다.

포함된 테이블 Regex

테이블 경로와 일치시킬 정규식입니다. 식과 일치하는 모든 경로가 복제되며, 나중에 생성되는 패턴과 일치하는 새 테이블도 자동으로 포함됩니다. 예: public\.auto_.*

이름 또는 Regex로 테이블을 선택합니다. 둘 다 사용하는 경우 두 옵션 중 하나에서 일치하는 모든 테이블이 포함됩니다.

열 필터 JSON

선택 사항. 정규화된 테이블 이름 목록과 복제에 포함해야 하는 열 이름에 대한 정규식 패턴이 포함된 JSON 입니다. 예를 들어 [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ]``은 ``public 스키마의 ``table1``에서 ``name``으로 끝나는 모든 열을 포함합니다.

작업 일정 CRON 병합하기

CRON 식은 저널에서 대상 테이블로의 병합 작업이 트리거되는 기간을 정의하는 식입니다. 연속 병합을 원하거나 웨어하우스 실행 시간을 제한하는 시간 예약을 하려면 * * * * * ? 로 설정하십시오.

예:

  • * 0 * * * ? 문자열은 1분 동안 전체 시간에 병합을 예약할 것을 나타냅니다

  • * 20 14 ? * MON-FRI 문자열은 매주 월요일부터 금요일까지 오후 2:20에 병합을 예약하려는 것을 나타냅니다.

추가 정보 및 예제는 Quartz 설명서 의 cron 트리거 자습서를 참조하십시오.

PostgreSQL 복제본 서버에서 테이블 복제

커넥터는 `논리적 복제<https://www.postgresql.org/docs/current/logical-replication.html>`_를 사용하여 기본 서버, `핫 대기 복제본<https://www.postgresql.org/docs/current/hot-standby.html>`_ 또는 구독자 서버에서 데이터를 수집할 수 있습니다. PostgreSQL 복제본에 연결하도록 커넥터를 구성하기 전에 기본 노드와 복제본 노드 간의 복제가 올바르게 작동하는지 확인합니다. 커넥터의 데이터 누락 문제를 조사할 때 먼저 커넥터에서 사용하는 복제본 서버에 누락된 행이 있는지 확인합니다.

대기 복제본에 연결할 때의 추가 고려 사항:

  • 핫 대기 복제본에 연결하는 것만 지원됩니다. 웜 대기 복제본은 기본 인스턴스로 승격될 때까지 클라이언트의 연결을 허용할 수 없습니다.

  • 서버의 PostgreSQL 버전은 16 이상이어야 합니다.

  • 커넥터에 필요한 :ref:`게시<label-postgres_connector_create_a_publication>`는 대기 서버가 아닌 기본 서버에 생성해야 합니다. 대기 서버는 읽기 전용이며 게시 생성을 허용하지 않습니다.

핫 대기 인스턴스에 연결했으며 Openflow 게시판에 복제 슬롯 ‘<replication slot>’ 생성 시도 중 시간이 초과되었습니다. 대기 인스턴스에 연결하는 경우 기본 PostgreSQL 인스턴스에 트래픽이 있는지 확인하십시오. 그렇지 않으면 복제 슬롯을 생성하기 위한 호출이 결과를 반환하지 않습니다. 오류가 표시되거나 Read PostgreSQL CDC Stream 프로세서가 시작되지 않는 경우 기본 PostgreSQL 인스턴스에 로그인하고 다음 쿼리를 실행하십시오.

SELECT pg_log_standby_snapshot();
Copy

이 오류는 기본 서버에서 데이터가 변경되지 않을 때 발생합니다. 따라서 복제본 서버에 복제 슬롯을 생성하는 동안 커넥터가 중단될 수 있습니다. 이러한 문제는 복제본 서버가 복제 슬롯을 생성하려면 기본 서버에 실행 중인 트랜잭션에 대한 정보를 요구해야 하기 때문에 발생합니다. 기본 서버는 유휴 상태일 때는 정보를 보내지 않습니다. pg_log_standby_snapshot() 함수는 기본 서버가 실행 중인 트랜잭션에 대한 정보를 강제로 복제본 서버로 보내도록 합니다.

복제에 테이블을 제거했다가 다시 추가하기

복제에서 테이블을 제거하려면 복제 매개 변수 컨텍스트의 포함된 테이블 이름 또는 포함된 테이블 정규식 매개 변수에서 테이블이 제거되었는지 확인합니다.

나중에 복제에 테이블을 다시 추가하려면 먼저 Snowflake에서 해당 대상 테이블을 삭제하십시오. 그런 다음 테이블을 포함된 테이블 이름 또는 포함된 테이블 정규식 매개 변수에 다시 추가합니다. 이렇게 하면 테이블에 대한 복제 프로세스가 새로 시작됩니다.

이 접근법은 실패한 테이블 복제 시나리오에서 복구하는 데에도 사용할 수 있습니다.

테이블에서 열의 하위 집합 복제하기

커넥터는 테이블별로 복제된 데이터를 구성된 열의 하위 집합으로 필터링할 수 있습니다.

열에 필터를 적용하려면 복제 매개 변수 컨텍스트에서 열 필터 속성을 수정하여 필터를 적용하려는 모든 테이블에 대해 1개의 항목씩 구성 배열을 추가합니다.

열은 이름이나 패턴에 따라 포함하거나 제외할 수 있습니다. 테이블당 단일 조건을 적용하거나 여러 조건을 결합할 수 있으며, 제외가 항상 포함보다 우선합니다.

다음 예제는 사용할 수 있는 필드를 보여줍니다. schematable 은 필수이며 included, excluded, includedPattern, excludedPattern 중 1개 이상이 필요합니다.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

테이블의 데이터 변경 사항 추적

커넥터는 소스 테이블의 데이터의 현재 상태뿐만 아니라 모든 변경 집합의 모든 행의 모든 상태도 복제합니다. 이 데이터는 대상 테이블과 동일한 스키마로 생성된 저널 테이블에 저장됩니다.

저널 테이블 이름은 ``<source table name>_JOURNAL_<timestamp>_<schema generation> 형식입니다. 여기서 <timestamp>는 소스 테이블이 복제본에 추가된 시점의 epoch 초 값이며, <schema generation>``은 소스 테이블에서 스키마가 변경될 때마다 증가하는 정수입니다. 결과적으로, 스키마가 변경되는 원본 테이블에는 여러 저널 테이블이 포함되게 됩니다.

복제에서 테이블을 제거했다가 다시 추가하면 <타임스탬프> 값이 변경되고 <스키마 생성>1 에서 다시 시작됩니다.

중요

어떤 방식으로든 저널 테이블의 구조는 변경하지 않는 것이 좋습니다. 커넥터가 복제 프로세스의 일부로 대상 테이블을 업데이트하는 데 사용됩니다.

커넥터는 저널 테이블을 삭제하지 않으며, 복제된 모든 소스 테이블에 대해 최신 저널을 사용하여 저널 상단의 Append-only 스트림만 읽습니다. 저장소를 회수하려면 다음을 수행할 수 있습니다.

  • 언제든지 모든 저널 테이블을 자를 수 있습니다.

  • 복제에서 제거된 원본 테이블과 관련된 저널 테이블을 삭제할 수 있습니다.

  • 능동적으로 복제된 테이블에 대한 최신 세대 저널 테이블을 제외한 모든 저널 테이블을 삭제할 수 있습니다.

예를 들어, 커넥터가 소스 테이블 orders 를 적극적으로 복제하도록 설정되어 있고 이전에 테이블 customers 를 복제에서 제거한 경우 다음과 같은 저널 테이블이 있을 수 있습니다. 이 경우 orders_5678_2제외하고 모두 삭제할 수 있습니다.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

병합 작업 예약 구성하기

커넥터는 데이터 웨어하우스를 사용하여 변경 데이터 캡처(CDC) 데이터를 대상 테이블에 병합합니다. 이 작업은 MergeSnowflakeJournalTable 프로세서에 의해 트리거됩니다. 새로운 변경 사항이 없거나 MergeSnowflakeJournalTable 큐에 대기 중인 새 플로우 파일이 없는 경우 병합이 트리거되지 않고 웨어하우스가 자동 일시 중단됩니다.

웨어하우스 비용을 제한하고 예약된 시간에만 병합을 제한하려면 병합 작업 일정 CRON 매개 변수에서 CRON 식을 사용하십시오. MergeSnowflakeJournalTable 프로세서로 수신 플로우 파일을 스로틀링하고 병합은 전용 기간에만 트리거됩니다. 예약에 대한 자세한 내용은 예약 전략 섹션을 참조하십시오.

커넥터 중지 또는 삭제하기

커넥터를 중지하거나 제거할 때는 커넥터가 사용하는 복제 슬롯 을 고려해야 합니다.

커넥터는 snowflake_connector_ 로 시작하는 이름 뒤에 임의의 접미사가 붙은 자체 복제 슬롯을 생성합니다. 커넥터가 복제 스트림을 읽으면 슬롯을 전진시켜 PostgreSQL 에서 WAL 로그를 트리밍하고 디스크 공간을 확보할 수 있도록 합니다.

커넥터가 일시 중지되면 슬롯이 진행되지 않고 소스 데이터베이스의 변경 사항으로 인해 WAL 로그 크기가 계속 증가합니다. 특히 트래픽이 많은 데이터베이스에서는 커넥터를 장시간 일시 중지해서는 안 됩니다.

커넥터를 Openflow 캔버스에서 삭제하거나 전체 Openflow 인스턴스를 삭제하는 등 다른 방법으로 커넥터를 제거해도 복제 슬롯은 그대로 유지되므로 수동으로 삭제해야 합니다.

동일한 PostgreSQL 데이터베이스에서 복제하는 커넥터 인스턴스가 여러 개 있는 경우 각 인스턴스는 고유한 이름의 복제 슬롯을 생성합니다. 복제 슬롯을 수동으로 삭제할 때는 올바른 슬롯인지 확인하십시오. CaptureChangePostgreSQL 프로세서의 상태를 확인하여 특정 커넥터 인스턴스가 어떤 복제 슬롯을 사용하는지 확인할 수 있습니다.

플로우 실행

  1. 평면을 마우스 오른쪽 버튼으로 클릭하고 Enable all Controller Services 를 선택합니다.

  2. 가져온 프로세스 그룹을 마우스 오른쪽 버튼으로 클릭하고 Start 를 선택합니다. 커넥터가 데이터 수집을 시작합니다.