Google Cloud Storage용 Snowpipe 자동화하기¶
이 항목에서는 Google Cloud Storage(GCS) 이벤트에 대한 Google Cloud Pub/Sub 메시지를 사용하여 Snowpipe 데이터 로드를 자동으로 트리거하는 지침을 제공합니다.
OBJECT_FINALIZE
이벤트에서만 Snowpipe가 파일을 로드하도록 트리거한다는 점에 유의하십시오. 비용, 이벤트 노이즈, 지연 시간을 줄이기 위해 Snowpipe에 대해 지원되는 이벤트만 보내는 것이 좋습니다.
이 항목의 내용:
클라우드 플랫폼 지원¶
GCS Pub/Sub 이벤트 메시지를 사용하여 자동화된 Snowpipe 데이터 로드를 트리거하는 기능은 지원되는 모든 클라우드 플랫폼 에서 호스팅되는 Snowflake 계정에서 지원됩니다.
클라우드 저장소에 대한 보안 액세스 구성하기¶
참고
데이터 파일을 저장하는 GCS 버킷에 대한 보안 액세스를 이미 구성한 경우에는 이 섹션을 건너뛸 수 있습니다.
이 섹션에서는 Snowflake ID 및 액세스 관리(IAM) 엔터티에 클라우드 저장소에 대한 인증 책임을 위임하도록 Snowflake 저장소 통합 오브젝트를 구성하는 방법을 설명합니다.
이 섹션에서는 Snowflake가 외부(즉, 클라우드 저장소) 스테이지에서 참조하는 Google Cloud Storage 버킷에서 읽고 쓸 수 있도록 저장소 통합을 사용하는 방법을 설명합니다. 통합은 시크릿 키 또는 액세스 토큰과 같은 명시적 클라우드 공급자 자격 증명을 전달할 필요가 없는 명명된 일급 Snowflake 오브젝트입니다. 대신에, 통합 오브젝트는 클라우드 저장소 서비스 계정을 참조합니다. 조직의 관리자가 클라우드 저장소 계정에서 서비스 계정 권한을 부여합니다.
또한, 관리자는 통합을 사용하는 외부 스테이지에서 액세스하는 특정 클라우드 저장소 버킷 세트(및 선택적 경로)으로 사용자를 제한할 수 있습니다.
참고
이 섹션의 지침을 완료하려면 클라우드 저장소 프로젝트에 프로젝트 편집자로 액세스해야 합니다. 프로젝트 편집자가 아닌 경우 클라우드 저장소 관리자에게 이러한 작업을 수행하도록 요청하십시오.
다음 다이어그램은 클라우드 저장소 스테이지의 통합 흐름을 보여줍니다.
외부(즉, 클라우드 저장소) 스테이지는 정의에서 저장소 통합 오브젝트를 참조합니다.
Snowflake는 저장소 통합을 계정에 대해 생성된 클라우드 저장소 서비스 계정과 자동으로 연결합니다. Snowflake는 Snowflake 계정의 모든 GCS 저장소 통합에서 참조하는 단일 서비스 계정을 생성합니다.
클라우드 저장소 프로젝트의 프로젝트 편집자는 스테이지 정의에서 참조된 버킷에 액세스할 수 있는 권한을 서비스 계정에 부여합니다. 많은 외부 스테이지 오브젝트가 다른 버킷과 경로를 참조하고 인증에서 동일한 통합을 사용할 수 있습니다.
사용자가 스테이지에서 데이터를 로드하거나 언로드할 때 Snowflake는 액세스를 허용 또는 거부하기 전 버킷의 서비스 계정에 부여된 권한을 확인합니다.
이 섹션의 내용:
1단계: Snowflake에서 클라우드 저장소 통합 만들기¶
CREATE STORAGE INTEGRATION 명령을 사용하여 통합을 생성합니다. 통합은 외부 클라우드 저장소에 대한 인증 책임을 Snowflake 생성 엔터티(예: 클라우드 서비스 계정)에 위임하는 Snowflake 오브젝트입니다. 클라우드 저장소 버킷에 액세스하기 위해 Snowflake는 데이터 파일을 저장하는 버킷에 액세스할 수 있는 권한을 부여할 수 있는 서비스 계정을 생성합니다.
단일 저장소 통합은 여러 외부(즉, GCS) 스테이지를 지원할 수 있습니다. 스테이지 정의의 URL은 STORAGE_ALLOWED_LOCATIONS 매개 변수에 대해 지정된 Azure 컨테이너GCS 버킷(및 선택적 경로)와 일치해야 합니다.
참고
계정 관리자(ACCOUNTADMIN 역할의 사용자) 또는 전역 CREATE INTEGRATION 권한이 있는 역할만 이 SQL 명령을 실행할 수 있습니다.
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('gcs://<bucket>/<path>/', 'gcs://<bucket>/<path>/') ]
여기서
integration_name
은 새 통합의 이름입니다.bucket
은 데이터 파일을 저장하는 클라우드 저장소 버킷의 이름입니다(예:mybucket
). 필수 STORAGE_ALLOWED_LOCATIONS 매개 변수와 선택적 STORAGE_BLOCKED_LOCATIONS 매개 변수는 이 통합을 참조하는 스테이지가 생성되거나 수정될 때 이러한 버킷에 대한 액세스를 각각 제한하거나 차단합니다.path
는 버킷의 오브젝트를 세부적으로 제어하기 위해 사용할 수 있는 선택적 경로입니다.
다음은 통합을 사용하여 두 버킷과 경로 중 하나를 참조하는 외부 스테이지를 명시적으로 제한하는 통합을 만드는 예입니다. 이후 단계에서 이러한 버킷과 경로 중 하나를 참조하는 외부 스테이지를 생성합니다.
이 통합도 사용하는 추가 외부 스테이지는 허용되는 버킷과 경로를 참조할 수 있습니다.
CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'GCS' ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://mybucket1/path1/', 'gcs://mybucket2/path2/') STORAGE_BLOCKED_LOCATIONS = ('gcs://mybucket1/path1/sensitivedata/', 'gcs://mybucket2/path2/sensitivedata/');
2단계: Snowflake 계정에 대한 클라우드 저장소 서비스 계정 검색¶
DESCRIBE INTEGRATION 명령을 실행하여 Snowflake 계정에 대해 자동으로 생성된 클라우드 저장소 서비스 계정에 대한 ID를 검색합니다.
DESC STORAGE INTEGRATION <integration_name>;
여기서
integration_name
은 이 항목의 1단계: Snowflake에서 클라우드 저장소 통합 만들기 에서 생성한 통합의 이름입니다.
예:
DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+
출력의 STORAGE_GCP_SERVICE_ACCOUNT 속성은 Snowflake 계정에 대해 생성된 클라우드 저장소 서비스 계정을 보여줍니다(예: service-account-id@project1-123456.iam.gserviceaccount.com
). 전체 Snowflake 계정에 대해 단일 클라우드 저장소 서비스 계정을 프로비저닝합니다. 모든 클라우드 저장소 통합은 해당 서비스 계정을 사용합니다.
3단계: 서비스 계정에 버킷 오브젝트 액세스 권한 부여¶
다음 단계별 지침에서는 클라우드 저장소 버킷을 사용하여 데이터를 로드 및 언로드할 수 있도록 Google Cloud Platform 콘솔에서 Snowflake에 대한 IAM 액세스 허가를 구성하는 방법을 설명합니다.
사용자 지정 IAM 역할 만들기¶
버킷에 액세스하고 오브젝트를 가져오기 위해 필요한 권한이 있는 사용자 지정 역할을 생성합니다.
Google Cloud Platform Console에 프로젝트 편집자로 로그인합니다.
홈 대시보드에서 IAM & admin » Roles 를 선택합니다.
Create Role 을 클릭합니다.
사용자 지정 역할의 이름과 설명을 입력합니다.
Add Permissions 를 클릭합니다.
권한 목록을 필터링하고 목록에 다음을 추가합니다.
작업
필수 권한
데이터 로딩 전용
storage.buckets.get
storage.objects.get
storage.objects.list
제거 옵션을 사용한 데이터 로딩으로 스테이지에서 REMOVE 명령 실행
storage.buckets.get
storage.objects.delete
storage.objects.get
storage.objects.list
데이터 로딩 및 언로딩
storage.buckets.get
(데이터 전송 비용 계산용)storage.objects.create
storage.objects.delete
storage.objects.get
storage.objects.list
데이터 언로딩 전용
storage.buckets.get
storage.objects.create
storage.objects.delete
storage.objects.list
Create 를 클릭합니다.
클라우드 저장소 서비스 계정에 사용자 지정 역할 할당하기¶
Google Cloud Platform Console에 프로젝트 편집자로 로그인합니다.
홈 대시보드에서 Cloud Storage » Browser 를 선택합니다.
액세스를 구성할 버킷을 선택합니다.
오른쪽 상단 모서리에서 SHOW INFO PANEL 를 클릭합니다. 버킷에 대한 정보 패널이 표시됩니다.
ADD PRINCIPAL 버튼을 클릭합니다.
New principals 필드에서 이 항목의 2단계: Snowflake 계정에 대한 클라우드 저장소 서비스 계정 검색 의 DESCRIBE INTEGRATION 출력에서 서비스 계정 이름을 검색합니다.
Select a role 드롭다운에서 Custom »
<역할>
을 선택합니다. 여기서<역할>
은 이 항목의 사용자 지정 IAM 역할 만들기 에서 생성한 사용자 지정 클라우드 저장소 역할입니다.Save 버튼을 클릭합니다. 서비스 계정 이름이 정보 패널의 Storage Object Viewer 역할 드롭다운에 추가됩니다.
클라우드 키 관리 서비스 암호화 키에 클라우드 저장소 서비스 계정 권한 부여하기¶
참고
이 단계는 Google Cloud Key Management Service(클라우드 KMS)에 저장된 키를 사용하여 GCS 버킷을 암호화하는 경우에만 필요합니다.
Google Cloud Platform Console에 프로젝트 편집자로 로그인합니다.
홈 대시보드에서 Security » Cryptographic keys 를 선택합니다.
GCS 버킷에 할당된 키 링을 선택합니다.
오른쪽 상단 모서리에서 SHOW INFO PANEL 를 클릭합니다. 키 링에 대한 정보 패널이 표시됩니다.
ADD PRINCIPAL 버튼을 클릭합니다.
New principals 필드에서 이 항목의 2단계: Snowflake 계정에 대한 클라우드 저장소 서비스 계정 검색 의 DESCRIBE INTEGRATION 출력에서 서비스 계정 이름을 검색합니다.
Select a role 드롭다운에서
Cloud KMS CrytoKey Encryptor/Decryptor
역할을 선택합니다.Save 버튼을 클릭합니다. 서비스 계정 이름이 정보 패널의 Cloud KMS CrytoKey Encryptor/Decryptor 역할 드롭다운에 추가됩니다.
GCS Pub/Sub를 사용하여 자동화 구성하기¶
전제 조건¶
이 항목의 지침에서는 다음 항목이 생성 및 구성된 것으로 가정합니다.
- GCP 계정:
GCS 버킷에서 이벤트 메시지를 수신하는 Pub/Sub 항목. 자세한 내용은 이 항목의 Pub/Sub 항목 만들기 섹션을 참조하십시오.
Pub/Sub 항목에서 이벤트 메시지를 수신하는 구독. 자세한 내용은 이 항목의 Pub/Sub 구독 만들기 섹션을 참조하십시오.
자세한 지침은 Pub/Sub 설명서 를 참조하십시오.
- Snowflake:
데이터를 로드하려는 Snowflake 데이터베이스의 대상 테이블.
Pub/Sub 항목 만들기¶
Cloud Shell 또는 클라우드 SDK 를 사용하여 Pub/Sub 항목을 생성합니다.
다음 명령을 실행하여 항목을 생성하고 지정된 GCS 버킷의 활동을 수신하도록 활성화합니다.
$ gsutil notification create -t <topic> -f json gs://<bucket-name> -e OBJECT_FINALIZE
여기서
<topic>
은 항목의 이름입니다.<bucket-name>
은 GCS 버킷의 이름입니다.
항목이 이미 있는 경우 명령에서는 해당 항목을 사용하고, 그렇지 않으면 이 명령으로 새 항목이 생성됩니다.
자세한 내용은 Pub/Sub 설명서의 클라우드 저장소용 Pub/Sub 알림 사용하기 를 참조하십시오.
Pub/Sub 구독 만들기¶
클라우드 콘솔, gcloud
명령줄 도구 또는 클라우드 Pub/Sub API를 사용하여 Pub/Sub 항목에 대한 끌어오기 배달을 사용하는 구독을 생성합니다. 자세한 지침은 Pub/Sub 설명서의 항목 및 구독 관리하기 를 참조하십시오.
참고
Snowflake에서는 기본 끌어오기 배달을 사용하는 Pub/Sub 구독만 지원됩니다. 밀어넣기 배달은 지원되지 않습니다.
Pub/Sub 구독 ID 검색하기¶
Pub/Sub 항목 구독 ID는 이 지침에서 Snowflake가 이벤트 메시지에 액세스할 수 있도록 하기 위해 사용됩니다.
Google Cloud Platform Console에 프로젝트 편집자로 로그인합니다.
홈 대시보드에서 Big Data » Pub/Sub » Subscriptions 을 선택합니다.
항목 구독의 Subscription ID 열에서 ID를 복사합니다.
1단계: Snowflake에서 알림 통합 만들기¶
CREATE NOTIFICATION INTEGRATION 명령을 사용하여 알림 통합을 생성합니다. 알림 통합은 Pub/Sub 구독을 참조합니다. Snowflake는 계정에 대해 생성된 GCS 서비스 계정과 알림 통합을 연결합니다. Snowflake는 Snowflake 계정의 모든 GCS 알림 통합에서 참조하는 단일 서비스 계정을 생성합니다.
참고
계정 관리자(ACCOUNTADMIN 역할의 사용자) 또는 전역 CREATE INTEGRATION 권한이 있는 역할만 이 SQL 명령을 실행할 수 있습니다.
알림 통합을 위한 GCS 서비스 계정은 저장소 통합을 위해 생성된 서비스 계정과 다릅니다.
단일 알림 통합은 단일 Google Cloud Pub/Sub 구독을 지원합니다. 이벤트 알림은 알림 통합 사이에서 분할되므로 여러 알림 통합에서 동일한 Pub/Sub 구독을 참조하면 대상 테이블에서 데이터가 누락될 수 있습니다. 따라서 파이프가 기존 파이프와 동일한 Pub/Sub 구독을 참조하는 경우 파이프 생성이 차단됩니다.
CREATE NOTIFICATION INTEGRATION <integration_name>
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = '<subscription_id>';
여기서
integration_name
은 새 통합의 이름입니다.subscription_id
는 Pub/Sub 구독 ID 검색하기 에서 기록한 구독의 이름입니다.
예:
CREATE NOTIFICATION INTEGRATION my_notification_int
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/project-1234/subscriptions/sub2';
2단계: Pub/Sub 구독에 Snowflake 액세스 권한 부여¶
DESCRIBE INTEGRATION 명령을 사용하여 Snowflake 계정 ID를 검색합니다.
DESC NOTIFICATION INTEGRATION <integration_name>;
여기서
integration_name
은 1단계: Snowflake에서 알림 통합 만들기 에서 생성한 통합의 이름입니다.
예:
DESC NOTIFICATION INTEGRATION my_notification_int;
GCP_PUBSUB_SERVICE_ACCOUNT 열의 서비스 계정 이름을 기록하고 해당 형식은 다음과 같습니다.
<service_account>@<project_id>.iam.gserviceaccount.com
Google Cloud Platform Console에 프로젝트 편집자로 로그인합니다.
홈 대시보드에서 Big Data » Pub/Sub » Subscriptions 을 선택합니다.
액세스를 구성할 구독을 선택합니다.
오른쪽 상단 모서리에서 SHOW INFO PANEL 를 클릭합니다. 구독에 대한 정보 패널이 표시됩니다.
ADD PRINCIPAL 버튼을 클릭합니다.
New principals 드롭다운에서 기록한 서비스 계정 이름을 검색합니다.
Select a role 드롭다운에서 Pub/Sub Subscriber 를 선택합니다.
Save 버튼을 클릭합니다. 서비스 계정 이름이 정보 패널의 Pub/Sub Subscriber 역할 드롭다운에 추가됩니다.
클라우드 콘솔의 Dashboard 페이지로 이동하고 드롭다운 목록에서 프로젝트를 선택합니다.
ADD PEOPLE TO THIS PROJECT 버튼을 클릭합니다.
기록한 서비스 계정 이름을 추가합니다.
Select a role 드롭다운에서 Monitoring Viewer 를 선택합니다.
Save 버튼을 클릭합니다. 서비스 계정 이름이 Monitoring Viewer 역할에 추가됩니다.
3단계: 스테이지 만들기(필요한 경우)¶
CREATE STAGE 명령을 사용하여 GCS 버킷을 참조하는 외부 스테이지를 생성합니다. Snowflake는 스테이징된 데이터 파일을 외부 테이블 메타데이터로 읽습니다. 아니면 기존 외부 스테이지를 사용할 수 있습니다.
참고
클라우드 저장소 위치에 대한 보안 액세스를 구성하려면 이 항목의 클라우드 저장소에 대한 보안 액세스 구성하기 를 참조하십시오.
CREATE STAGE 문에서 저장소 통합을 참조하려면 역할에 저장소 통합 오브젝트에 대한 USAGE 권한이 있어야 합니다.
다음 예에서는 사용자 세션에 대한 활성 스키마에 이름이 mystage
인 스테이지를 생성합니다. 클라우드 저장소 URL에는 files
경로가 포함됩니다. 스테이지는 my_storage_int
저장소 통합을 참조합니다.
USE SCHEMA mydb.public; CREATE STAGE mystage URL='gcs://load/files/' STORAGE_INTEGRATION = my_storage_int;
4단계: 자동 수집이 활성화된 파이프 만들기¶
CREATE PIPE 명령을 사용하여 파이프를 생성합니다. 파이프는 수집 큐에서 대상 테이블로 데이터를 로드하기 위해 Snowpipe에서 사용하는 COPY INTO <테이블> 문을 정의합니다.
CREATE PIPE <pipe_name>
AUTO_INGEST = true
INTEGRATION = '<notification_integration_name>'
AS
<copy_statement>;
여기서
<파이프_이름>
파이프에 대한 식별자로, 파이프가 생성된 스키마에 대해 고유한 식별자여야 합니다.
식별자는 알파벳 문자로 시작해야 하며 전체 식별자 문자열을 큰따옴표(예:
"My object"
)로 묶지 않는 한 공백이나 특수 문자를 포함할 수 없습니다. 큰따옴표로 묶인 식별자도 대/소문자를 구분합니다.INTEGRATION = '<알림_통합_이름>'
Google Cloud Pub/Sub 알림을 사용하여 디렉터리 테이블 메타데이터를 자동으로 새로 고치기 위해 사용되는 알림 통합의 이름입니다. 알림 통합은 Snowflake와 서드 파티 클라우드 메시지 큐 서비스 간의 인터페이스를 제공하는 Snowflake 오브젝트입니다.
copy_statement
큐에 대기 중인 파일에서 Snowflake 테이블로 데이터를 로딩하는 데 사용되는 COPY INTO <테이블> 문입니다. 이 문은 파이프에 대한 텍스트/정의의 역할을 하며 SHOW PIPES 출력에 표시됩니다.
예를 들어, 이름이 mystage
인 외부(GCS) 스테이지에서 스테이징된 파일에서 이름이 mytable
인 대상 테이블로 데이터를 로드하는 파이프를 snowpipe_db.public
스키마에 생성합니다.
CREATE PIPE snowpipe_db.public.mypipe
AUTO_INGEST = true
INTEGRATION = 'MY_NOTIFICATION_INT'
AS
COPY INTO snowpipe_db.public.mytable
FROM @snowpipe_db.public.mystage/path2;
INTEGRATION 매개 변수는 1단계: Snowflake에서 알림 통합 만들기 에서 생성한 my_notification_int
알림 통합을 참조합니다. 통합 이름은 모두 대문자로 입력해야 합니다.
중요
COPY INTO <테이블> 문의 저장 위치 참조가 계정의 기존 파이프 참조와 중첩되지 않는지 확인합니다. 그렇지 않으면 여러 파이프가 동일한 데이터 파일 세트를 대상 테이블로 로드할 수 있습니다. 예를 들어, 여러 파이프 정의가 <저장소_위치>/path1/
및 <저장소_위치>/path1/path2/
과 같이 세분화 수준이 다른 동일한 저장소 위치를 참조할 때 이러한 상황이 발생할 수 있습니다. 이 예에서, 파일이 <저장소_위치>/path1/path2/
에서 스테이징된 경우 두 파이프 모두 파일의 복사본을 로드합니다.
SHOW PIPES 를 실행하거나 Account Usage의 PIPES 뷰 또는 Information Schema의 PIPES 뷰를 쿼리하여 계정의 모든 파이프 정의에서 COPY INTO <테이블> 문을 확인합니다.
이제 자동 수집 기능이 포함된 Snowpipe가 구성되었습니다!
새 데이터 파일이 GCS 버킷에 추가되면 이벤트 메시지는 Snowpipe에 파이프에 정의된 대상 테이블에 로드하도록 알립니다.
5단계: 기록 파일 로드¶
Pub/Sub 메시지 전에 외부 스테이지에 있는 데이터 파일의 백로그를 로드하려면, ALTER PIPE … REFRESH 문을 실행합니다.
6단계: 스테이징된 파일 삭제¶
데이터를 로드하고 더 이상 파일이 필요하지 않으면 스테이징된 파일을 삭제합니다. 자세한 지침은 Snowpipe가 데이터를 로드한 후 스테이징된 파일 삭제하기 섹션을 참조하십시오.
SYSTEM$PIPE_STATUS 출력¶
SYSTEM$PIPE_STATUS 함수는 파이프의 현재 상태에 대한 JSON 표현을 검색합니다.
AUTO_INGEST가 TRUE로 설정된 파이프의 경우 이 함수는 다음 이름/값 페어를 포함하는 JSON 오브젝트를 반환합니다(현재 파이프 상태에 해당하는 경우).
{"executionState":"<value>","oldestFileTimestamp":<value>,"pendingFileCount":<value>,"notificationChannelName":"<value>","numOutstandingMessagesOnChannel":<value>,"lastReceivedMessageTimestamp":"<value>","lastForwardedMessageTimestamp":"<value>","error":<value>,"fault":<value>}
출력 값에 대한 설명은 SQL 함수에 대한 참조 항목을 참조하십시오.