자습서: Snowflake Native SDK for Connectors Java Connector 템플릿¶
소개¶
Snowflake Native SDK for Connectors 을 활용한 커넥터 템플릿 사용 자습서에 오신 것을 환영합니다. 이 가이드는 단순한 Connector Native Application을 설정하는 데 도움이 됩니다.
이 자습서에서는 다음에 대해 설명합니다.
Connector Native Application 배포
데이터를 수집하기 위한 템플릿 커넥터 구성
사용자의 필요에 맞게 템플릿 커넥터 사용자 지정
템플릿에는 수정이 필요한 특정 파일을 쉽게 찾을 수 있도록 코드에 다양한 유용한 설명이 포함되어 있습니다. 다음 키워드가 포함된 설명을 찾아보십시오. 이러한 설명은 여러분이 커넥터를 직접 구현하는 데 도움이 될 것입니다.
TODO
TODO: HINT
TODO: IMPLEMENT ME
이 자습서를 시작하기 전에 다음 권장 콘텐츠를 검토하여 준비해야 합니다.
전제 조건¶
시작하기 전에 다음 요구 사항을 충족하는지 확인합니다.
ACCOUNTADMIN
역할이 있는 Snowflake 계정에 액세스하기커넥터용 Snowflake Native SDK 를 검토하고 이 자습서를 따르는 동안 열어 두십시오.
자습서: Snowflake Native SDK for Connectors Java Connector 예 섹션을 검토하십시오.
이 자습서에서는 이 템플릿을 기반으로 한 예시 커넥터를 사용하며 다양한 구성 요소의 구현 예를 확인하는 데 참조할 수 있습니다.
로컬 환경 준비하기¶
계속 진행하기 전에 컴퓨터에 필요한 모든 소프트웨어가 설치되어 있는지 확인하고 커넥터 템플릿을 복제해야 합니다.
Java 설치¶
Snowflake Native SDK for Connectors 에는 Java LTS (장기 지원) 버전 11 이상이 필요합니다. 컴퓨터에 최소 요구 사항인 Java 버전이 설치되어 있지 않은 경우 Oracle Java 또는 OpenJDK 를 설치해야 합니다.
Oracle Java¶
최신 LTS 릴리스(JDK)는 Oracle NFTC 에서 무료로 다운로드하여 사용할 수 있습니다. 다운로드 및 설치 관리 지침은 Oracle 페이지 에서 확인할 수 있습니다.
OpenJDK¶
OpenJDK 는 Java의 오픈 소스 구현입니다. 다운로드 및 설치 관리 지침은 openjdk.org 및 jdk.java.net 에서 확인할 수 있습니다.
서드 파티 OpenJDK 버전(예: Eclipse Temurin 또는 Amazon Corretto)을 사용할 수도 있습니다.
Snowflake CLI 구성¶
커넥터를 빌드, 배포 및 설치하려면 Snowflake CLI 도구가 필요합니다. 컴퓨터에 Snowflake CLI 가 설치되어 있지 않은 경우 여기 의 지침에 따라 설치하십시오.
도구가 설치된 후 구성 파일 에서 Snowflake에 대한 연결을 구성해야 합니다.
구성된 연결이 없는 경우 native_sdk_connection
이라는 이름의 연결을 새로 만듭니다. deployment/snowflake.toml
파일에서 연결 예시를 찾을 수 있습니다.
이미 연결이 구성되어 있고 커넥터와 함께 사용하려는 경우 이 자습서에서 이 연결을 사용할 때마다 native_sdk_connection
대신 해당 이름을 사용하십시오.
템플릿 복제¶
커넥터 템플릿을 복제하려면 다음 명령을 사용합니다.
snow init <project_dir> \
--template-source https://github.com/snowflakedb/connectors-native-sdk \
--template templates/connectors-native-sdk-template
<project_dir >
대신 커넥터의 Java 프로젝트가 생성될 디렉터리 이름(존재하지 않아야 함)을 입력합니다.
명령을 실행하면 애플리케이션 인스턴스 및 스테이지 이름 구성을 위한 추가 정보를 제공하라는 메시지가 표시됩니다. 따옴표로 묶지 않은 유효한 Snowflake 식별자라면 어떤 이름이라도 입력할 수 있으며, 대괄호 안에 표시된 기본값을 사용하려면 Enter 키를 클릭합니다.
사용자 지정 애플리케이션 및 스테이지 이름을 제공하는 명령 실행 예입니다.
$ snow init my_connector \
--template-source https://github.com/snowflakedb/connectors-native-sdk \
--template templates/connectors-native-sdk-template
Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector
커넥터 빌드, 배포 및 정리¶
템플릿은 수정 전에도 즉시 배포할 수 있습니다. 다음 섹션에서는 커넥터를 빌드, 배포 및 설치하는 방법을 설명합니다.
커넥터 빌드하기¶
Snowflake Native SDK for Connectors 를 사용하여 만든 커넥터를 빌드하는 것은 일반적인 Java 애플리케이션을 빌드하는 것과는 조금 다릅니다. 원본에서 .jar 아카이브를 빌드하는 것 외에도 수행해야 할 작업이 몇 가지 있습니다. 애플리케이션 구축은 다음 단계로 구성됩니다.
빌드 디렉터리에 사용자 지정 내부 구성 요소 복사하기
SDK 구성 요소를 빌드 디렉터리에 복사하기
내부 구성 요소 복사하기¶
이 단계에서는 커넥터 .jar 파일을 빌드한 다음 UI, 매니페스트 및 설정 파일과 함께 sf_build
디렉터리로 복사합니다.
이 단계를 실행하려면 ./gradlew copyInternalComponents
명령을 실행합니다.
SDK 구성 요소 복사¶
이 단계에서는 SDK .jar 파일(커넥터 Gradle 모듈에 종속성으로 추가됨)을 sf_build
디렉터리로 복사하고 .jar 아카이브에서 번들된 .sql 파일을 추출합니다.
이러한 .sql 파일을 사용하면 애플리케이션 설치 중에 어떤 공급자 오브젝트가 생성될지 사용자 지정할 수 있습니다. 오브젝트를 생략하면 잘못하면 일부 기능이 작동하지 않을 수 있으므로 처음에는 사용자 지정을 권장하지 않습니다. 템플릿 커넥터 애플리케이션은 all.sql
파일을 사용하여 권장되는 모든 SDK 오브젝트를 생성합니다.
이 단계를 실행하려면 ./gradlew copySdkComponents
명령을 실행합니다.
커넥터 배포하기¶
Native App을 배포하려면 Snowflake 내에 애플리케이션 패키지를 만들어야 합니다. 그런 다음 sf_build
디렉터리에 있는 모든 파일을 Snowflake에 업로드해야 합니다.
참고 - 개발 목적의 버전 생성은 선택 사항이며, 스테이징된 파일에서 애플리케이션 인스턴스를 직접 생성할 수 있습니다. 이 접근 방식을 사용하면 버전과 애플리케이션 인스턴스를 다시 만들지 않고도 대부분의 커넥터 파일에서 변경 사항을 확인할 수 있습니다.
다음 작업이 수행됩니다.
아직 존재하지 않는 경우 새 애플리케이션 패키지 만들기
패키지 내부에 스키마 및 파일 스테이지 만들기
sf_build
디렉터리에서 스테이지로 파일 업로드하기(이 단계는 다소 시간이 걸릴 수 있음)
커넥터를 배포하려면 snow app deploy --connection=native_sdk_connection
명령을 실행합니다.
snow app deploy
명령에 대한 자세한 내용은 snow app deploy 섹션을 참조하십시오.
생성된 애플리케이션 패키지는 이제 계정의 Snowflake UI 에서 Data products
카테고리의 App packages
탭에 표시됩니다.
커넥터 설치하기¶
애플리케이션 설치가 프로세스의 마지막 단계입니다. 이전에 생성한 애플리케이션 패키지에서 애플리케이션을 생성합니다.
커넥터를 설치하려면 snow app run --connection=native_sdk_connection
명령을 실행합니다.
snow app run
명령에 대한 자세한 내용은 snow app run 섹션을 참조하십시오.
설치된 애플리케이션은 이제 계정의 Snowflake UI 에서 Data products
카테고리의 Installed apps
탭에 표시됩니다.
커넥터 파일 업데이트하기¶
커넥터 파일을 수정하려는 경우 언제든지 애플리케이션 패키지 스테이지에 수정된 파일을 쉽게 업로드할 수 있습니다. 업로드 명령은 업데이트된 파일에 따라 달라집니다.
업데이트 명령을 실행하기 전에 ./gradlew copyInternalComponents
를 실행하여 sf_build
디렉터리에 커넥터의 새 파일을 복사해야 합니다.
UI .py 파일 또는 커넥터 .java 파일¶
snow app deploy --connection=native_sdk_connection
명령을 사용하면 현재 애플리케이션 인스턴스가 재설치 없이 새 파일을 사용합니다.
setup.sql 또는 manifest.yml 파일¶
snow app run --connection=native_sdk_connection
명령을 사용하면 새 파일이 스테이지에 업로드된 후 현재 애플리케이션 인스턴스가 다시 설치됩니다.
정리¶
자습서가 완료된 후 또는 어떤 이유로든 애플리케이션과 해당 패키지를 제거하려는 경우 다음 명령을 사용하여 계정에서 완전히 제거할 수 있습니다.
snow app teardown --connection=native_sdk_connection --cascade --force
--cascade
옵션은 계정 관리자에게 소유권을 이전하지 않고 대상 데이터베이스를 제거하려면 필요합니다. 실제 커넥터에서는 수집된 데이터를 보존하기 위해 데이터베이스를 제거해서는 안 되며, 계정 관리자가 소유하거나 제거하기 전에 소유권을 이전해야 합니다.
참고 - 커넥터는 수집이 구성되지 않았더라도 일시 중지되거나 제거될 때까지 크레딧을 소비합니다!
필수 조건 단계¶
설치 직후 커넥터는 마법사 단계에 있습니다. 이 단계는 최종 사용자에게 필요한 모든 구성을 안내하는 몇 가지 단계로 구성됩니다.
첫 번째 단계는 필수 조건 단계입니다. 이는 선택 사항이며 일부 커넥터에 필요하지 않을 수 있습니다. 전제 조건은 일반적으로 애플리케이션 외부에서 사용자에게 요구되는 작업(예: SQL 워크시트에서 쿼리 실행, 원본 시스템 측에서 구성 수행 등)입니다.
필수 조건(전제 조건)에 대해 자세히 알아보십시오.
각 전제 조건의 내용은 커넥터 내부에 위치한 STATE.PREREQUISITES
테이블에서 직접 검색할 수 있습니다. 이는 setup.sql
스크립트를 통해 사용자 지정할 수 있습니다. 그러나 setup.sql
스크립트는 애플리케이션을 설치, 업그레이드 및 다운그레이드할 때마다 실행된다는 점에 유의하십시오. 삽입은 멱등적이어야 하므로 아래 예제와 같이 병합 쿼리를 사용하는 것이 좋습니다.
MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
('1',
'Sample prerequisite',
'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
NULL,
NULL,
1
)
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
커넥터 구성 단계¶
마법사 단계의 다음 단계는 커넥터 구성 단계입니다. 이 단계에서는 커넥터에 필요한 데이터베이스 오브젝트와 권한을 구성할 수 있습니다. 이 단계에서는 다음 구성 속성을 지정할 수 있습니다.
warehouse
operational_warehouse
cortex_warehouse
destination_database
destination_schema
global_schedule
data_owner_role
cortex_user_role
agent_username
agent_role
다른 사용자 지정 속성이 필요한 경우 마법사 단계의 다음 단계 중 하나에서 구성할 수 있습니다. 각 속성에 대한 자세한 내용은 커넥터 구성 섹션을 참조하십시오.
또한 템플릿에 제공된 Streamlit 구성 요소(streamlit/wizard/connector_config.py
)는 Native Apps Permission SDK 를 트리거하고 최종 사용자에게 권한 부여를 요청하는 방법을 보여줍니다. 사용 가능한 속성이 커넥터의 요구 사항을 충족하는 한 백엔드 클래스를 덮어쓸 필요는 없지만, 구성의 추가 단계에서 구성 요소와 동일한 방식으로 가능합니다.
내부 프로시저 및 Java 오브젝트에 대한 자세한 내용은 커넥터 구성 참조 섹션을 참조하십시오.
제공된 Streamlit 예를 통해 manifest.yml
파일(CREATE DATABASE
및 EXECUTE TASKS
)에 구성된 계정 수준 권한을 요청할 수 있습니다. 또한 사용자는 Permission SDK 팝업을 통해 웨어하우스 참조를 지정할 수 있습니다.
템플릿에서 사용자는 destination_database
및 destination_schema
만 제공하도록 요청을 받습니다. 그러나 streamlit/wizard/connector_configuration.py
의 TODO
설명에는 Streamlit UI 에서 더 많은 입력 필드를 표시하는 데 재사용할 수 있는 주석이 달린 코드가 포함되어 있습니다.
# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
연결 구성 단계¶
마법사 단계의 다음 단계는 연결 구성 단계입니다. 이 단계에서는 최종 사용자가 커넥터에 대한 외부 연결 매개 변수를 구성할 수 있습니다. 이 구성에는 시크릿, 통합 등과 같은 오브젝트 식별자가 포함될 수 있습니다.
이 정보는 커넥터가 수집하는 데이터의 소스 시스템에 따라 달라지기 때문에 원본 코드에서 더 큰 사용자 지정을 해야 하는 첫 번째 부분입니다.
연결 구성에 대한 자세한 내용은 다음을 참조하십시오.
Streamlit UI 쪽(streamlit/wizard/connection_config.py
)부터 시작하여 필요한 모든 매개 변수에 대한 텍스트 입력을 추가해야 합니다. 예제 텍스트 입력이 구현되어 있으며 이 파일에서 코드를 검색하면 새 필드에 대한 설명 코드가 있는 TODO
를 찾을 수 있습니다.
# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
속성을 양식에 추가한 후에는 커넥터의 백엔드 계층으로 전달해야 합니다. 이를 위해서는 Streamlit 파일에서 두 개의 추가 위치를 수정해야 합니다. 첫 번째는 streamlit/wizard/connection_config.py
파일에 있는 finish_config
함수입니다. 새로 추가된 텍스트 입력의 상태는 여기에서 확인해야 합니다. 또한 필요한 경우 유효성을 검사한 다음 set_connection_configuration
함수에 전달할 수 있습니다.
예를 들어, additional_connection_property
를 추가한 경우 편집 후에는 다음과 같이 표시됩니다.
def finish_config():
try:
# TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
# The properties can also be validated, for example, check whether they are not blank strings etc.
response = set_connection_configuration(
custom_connection_property=st.session_state["custom_connection_property"],
additional_connection_property=st.session_state["additional_connection_property"],
)
# rest of the method without changes
그런 다음 set_connection_configuration
함수를 편집해야 하며 이 함수는 streamlit/native_sdk_api/connection_config.py
파일에서 찾을 수 있습니다. 이 함수는 Streamlit UI 와 Connector의 백엔드 진입점인 기본 SQL 프로시저 사이의 프록시입니다.
def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
# TODO: this part of the code sends the config to the backend so all custom properties need to be added here
config = {
"custom_connection_property": escape_identifier(custom_connection_property),
"additional_connection_property": escape_identifier(additional_connection_property),
}
return call_procedure(
"PUBLIC.SET_CONNECTION_CONFIGURATION",
[variant_argument(config)]
)
이렇게 하면 새 속성이 구성이 포함된 내부 커넥터 테이블에 저장됩니다. 하지만 이것이 가능한 맞춤 설정의 끝은 아닙니다. 일부 백엔드 구성 요소도 사용자 지정이 가능합니다. 해당 구성 요소를 찾으려면 코드에서 다음 설명을 찾아보십시오.
TODO: IMPLEMENT ME connection configuration validate
TODO: IMPLEMENT ME connection callback
TODO: IMPLEMENT ME test connection
유효성 검사 부분에서는 UI에서 받은 데이터에 대한 추가 유효성 검사를 수행할 수 있습니다. 또한 문자 대/소문자 변경, 제공된 데이터 트리밍, 제공된 이름의 오브젝트가 실제로 Snowflake 내에 존재하는지 확인하는 등 데이터를 변환할 수도 있습니다.
연결 콜백은 구성에 따라 추가 작업을 수행할 수 있도록 해주는 부분으로, 예를 들어 외부 통합 설정 참조 에 설명된 솔루션을 사용하여 외부 액세스 통합을 사용해야 하는 프로시저를 변경하는 등의 작업을 수행할 수 있습니다.
테스트 연결은 연결 구성의 마지막 구성 요소로, 커넥터와 소스 시스템 간에 연결을 설정할 수 있는지 확인합니다.
내부 구성 요소에 대한 자세한 내용은 다음을 참조하십시오.
구현 예제는 다음과 같습니다.
public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {
private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";
@Override
public ConnectorResponse validate(Variant config) {
// TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
// requires some additional validation this is the place to implement this logic.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
if (!integrationCheck.isOk()) {
return integrationCheck;
}
var secretCheck = checkParameter(config, SECRET_PARAM, true);
if (!secretCheck.isOk()) {
return ConnectorResponse.error(ERROR_CODE);
}
return ConnectorResponse.success();
}
}
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {
private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
};
private final Session session;
public TemplateConnectionConfigurationCallback(Session session) {
this.session = session;
}
@Override
public ConnectorResponse execute(Variant config) {
// TODO: If you need to alter some procedures with external access you can use
// configureProcedure method or implement a similar method on your own.
// TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
// to be done after connection configuration, like altering procedures with external access.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var response = configureProceduresWithReferences();
if (response.isNotOk()) {
return response;
}
return ConnectorResponse.success();
}
private ConnectorResponse configureProceduresWithReferences() {
return callProcedure(
session,
PUBLIC_SCHEMA,
SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
}
}
public class TemplateConnectionValidator {
private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";
public static Variant testConnection(Session session) {
// TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
// the source system here. This usually requires connection to some webservice or other external
// system. It is suggested to perform only the basic connectivity validation here.
// If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
return test().toVariant();
}
private static ConnectorResponse test() {
try {
var response = SourceSystemHttpHelper.testEndpoint();
if (isSuccessful(response.statusCode())) {
return ConnectorResponse.success();
} else {
return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
}
} catch (Exception exception) {
return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
}
}
}
최종 구성 단계¶
최종 커넥터 구성 단계는 마법사 단계의 마지막 단계입니다. 이 단계에서는 여러 작업을 수행합니다.
사용자는 커넥터에 필요한 추가 구성을 지정할 수 있습니다.
필요한 경우 수집된 데이터에 대한 싱크 데이터베이스, 스키마 및 추가 테이블과 뷰를 생성합니다.
스케줄러와 Task Reactor와 같은 내부 구성 요소를 초기화합니다.
구성 완료에 대한 자세한 내용은 구성 완료를 참조하십시오.
Task Reactor와 스케줄링에 대한 자세한 내용은 다음을 참조하십시오.
연결 구성 단계와 유사하게, 사용자 지정은 Streamlit UI 에서 시작할 수 있습니다. streamlit/wizard/finalize_config.py
파일에는 예시 속성이 있는 양식이 포함되어 있습니다. 커넥터 요구 사항에 따라 더 많은 속성을 추가할 수 있습니다. 다른 속성을 추가하려면 언급된 파일에서 새 속성을 추가하는 예제 코드가 포함된 TODO
설명을 찾아보십시오.
# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
새로운 속성에 대한 텍스트 입력을 추가한 후에는 백엔드 측으로 전달해야 합니다. 이를 위해서는 같은 파일에서 finalize_configuration
함수를 수정합니다.
def finalize_configuration():
try:
st.session_state["show_main_error"] = False
# TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
response = finalize_connector_configuration(
st.session_state.get("custom_property"),
st.session_state.get("some_additional_property")
)
다음으로, streamlit/native_sdk_api/finalize_config.py
파일을 열고 다음 함수에 새 속성을 추가합니다.
def finalize_connector_configuration(custom_property: str, some_additional_property: str):
# TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
config = {
"custom_property": custom_property,
"some_additional_property": some_additional_property,
}
return call_procedure(
"PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
[variant_argument(config)]
)
다시 말해, 연결 구성 단계와 유사하게 이 단계에서도 다양한 백엔드 구성 요소를 사용자 지정할 수 있으며, 이는 소스 코드에서 다음 주석을 사용하여 찾을 수 있습니다.
TODO: IMPLEMENT ME validate source
TODO: IMPLEMENT ME finalize internal
소스 검증 부분은 소스 시스템에서 더욱 정교한 검증을 수행하는 역할을 합니다. 이전 테스트 연결에서 연결이 설정될 수 있는지 여부만 확인한 경우, 검증 소스에서는 시스템의 특정 데이터에 대한 액세스(예: 단일 데이터 레코드 추출)를 확인할 수 있습니다.
최종 내부는 Task Reactor와 스케줄러를 초기화하고 싱크 데이터베이스와 필요한 모든 중첩 오브젝트를 생성하는 내부 프로시저입니다. 또한 최종 단계에서 제공된 구성을 저장하기 위해서도 사용할 수 있습니다(이 구성은 기본적으로 저장되지 않음).
내부 구성 요소에 대한 자세한 내용은 다음에서 확인할 수 있습니다.
또한 FinalizeConnectorInputValidator
인터페이스를 사용하여 입력을 검증하고 이를 종료자 처리기에 제공할 수 있습니다(TemplateFinalizeConnectorConfigurationCustomHandler
파일 확인). 빌더 사용에 대한 자세한 내용은 저장 프로시저 및 핸들러 사용자 지정 에서 확인할 수 있습니다.
검증 소스의 구현 예는 다음과 같습니다.
public class SourceSystemAccessValidator implements SourceValidator {
@Override
public ConnectorResponse validate(Variant variant) {
// TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
// system. In some cases this can be the same validation that happened in
// TemplateConnectionValidator.
// However, it is suggested to perform more complex validations, like specific access rights to
// some specific resources here.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
var finalizeProperties = Configuration.fromCustomConfig(variant);
var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
return prepareConnectorResponse(httpResponse.statusCode());
}
private ConnectorResponse prepareConnectorResponse(int statusCode) {
switch (statusCode) {
case 200:
return ConnectorResponse.success();
case 401:
return ConnectorResponse.error("Unauthorized error");
case 404:
return ConnectorResponse.error("Not found error");
default:
return ConnectorResponse.error("Unknown error");
}
}
}
리소스 만들기¶
마법사 단계가 완료되면 커넥터가 데이터 수집을 시작할 준비가 됩니다. 하지만 먼저 리소스를 구현하고 구성해야 합니다. 리소스는 소스 시스템의 특정 데이터 세트(예: 테이블, 엔드포인트, 파일 등)를 설명하는 추상화입니다.
다른 소스 시스템에는 리소스에 대한 다른 정보가 필요할 수 있으므로 리소스 정의는 특정 요구 사항에 맞게 사용자 지정되어야 합니다. 이를 위해서는 streamlit/daily_use/data_sync_page.py
파일로 이동합니다. 이 파일에서는 리소스 매개 변수에 대한 텍스트 입력을 추가하는 방법에 대한 TODO
설명을 찾을 수 있습니다. 리소스 매개 변수는 소스 시스템의 데이터를 식별하고 검색할 수 있도록 해야 합니다. 그런 다음 수집 중에 이러한 매개 변수를 추출할 수 있습니다.
# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
"Resource name",
key="resource_name",
)
st.text_input(
"Some resource parameter",
key="some_resource_parameter"
)
모든 필수 속성이 양식에 추가되면 백엔드 측으로 전달할 수 있습니다. 먼저 텍스트 필드의 상태를 추출하여 streamlit/daily_use/data_sync_page.py
파일의 API 수준 queue_resource
메서드로 전달해야 합니다.
def queue_resource():
# TODO: add additional properties here and pass them to create_resource function
resource_name = st.session_state.get("resource_name")
some_resource_parameter = st.session_state.get("some_resource_parameter")
if not resource_name:
st.error("Resource name cannot be empty")
return
result = create_resource(resource_name, some_resource_parameter)
if result.is_ok():
st.success("Resource created")
else:
st.error(result.get_message())
그런 다음 streamlit/native_sdk_api/resource_management.py
파일의 create_resource
함수를 업데이트해야 합니다.
def create_resource(resource_name, some_resource_parameter):
ingestion_config = [{
"id": "ingestionConfig",
"ingestionStrategy": "INCREMENTAL",
# TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
"scheduleType": "INTERVAL",
"scheduleDefinition": "60m"
}]
# TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
resource_id = {
"resource_name": resource_name,
}
id = f"{resource_name}_{random_suffix()}"
# TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
resource_metadata = {
"some_resource_parameter": some_resource_parameter
}
return call_procedure("PUBLIC.CREATE_RESOURCE",
[
varchar_argument(id),
variant_argument(resource_id),
variant_list_argument(ingestion_config),
varchar_argument(id),
"true",
variant_argument(resource_metadata)
])
CREATE_RESOURCE() 프로시저 논리 사용자 지정하기¶
PUBLIC.CREATE_RESOURCE()
프로시저를 통해 개발자는 메인 실행 흐름의 여러 곳에 연결되는 사용자 지정 논리를 구현하여 실행을 사용자 지정할 수 있습니다. SDK를 사용하면 개발자는:
리소스를 생성하기 전에 유효성 검사를 수행할 수 있습니다. 논리는
PUBLIC.CREATE_RESOURCE_VALIDATE()
프로시저에서 구현해야 합니다.리소스가 생성되기 전에 일부 사용자 지정 작업을 수행합니다. 논리는
PUBLIC.PRE_CREATE_RESOURCE()
프로시저에서 구현해야 합니다.리소스가 생성된 후 일부 사용자 지정 작업을 수행합니다. 논리는
PUBLIC.POST_CREATE_RESOURCE()
프로시저에서 구현해야 합니다.
PUBLIC.CREATE_RESOURCE()
프로시저 사용자 지정에 대한 자세한 내용은 여기에서 확인할 수 있습니다.
TemplateCreateResourceHandler.java¶
이 클래스는 PUBLIC.CREATE_RESOURCE()
프로시저의 처리기입니다. 여기에서는 앞서 설명한 콜백 프로시저에 대한 처리기의 Java 구현을 삽입할 수 있습니다. 기본적으로 템플릿은 SQL 프로시저 호출을 없애기 위해 콜백 처리기의 모의 Java 구현을 제공하므로 프로시저 실행 시간이 길어질 수 있으며, Java 구현을 사용하면 실행 속도가 빨라집니다. 이러한 모의 구현은 성공 응답을 반환하는 것 외에는 아무 작업도 수행하지 않습니다. 템플릿에서 준비한 콜백 클래스에 사용자 지정 구현을 제공하거나 이러한 콜백을 처음부터 생성하여 처리기 빌더의 기본 프로시저 실행 흐름에 삽입할 수 있습니다.
기본적으로 호출되는 콜백 메서드에 사용자 지정 논리를 구현하려면 코드에서 다음 설명을 찾으십시오.
TODO: IMPLEMENT ME create resource validate
TODO: IMPLEMENT ME pre create resource callback
TODO: IMPLEMENT ME post create resource callback
수집¶
데이터 수집을 수행하려면 소스 시스템과의 연결을 처리하고 리소스 구성에 따라 데이터를 검색하는 클래스를 구현해야 합니다. Scheduler 모듈과 Task Reactor 모듈은 수집 작업의 트리거와 큐에 추가를 처리합니다.
수집 논리는 TemplateIngestion
클래스에서 호출됩니다. 코드에서 TODO: IMPLEMENT ME ingestion
설명을 찾아 무작위 데이터 생성을 원본 시스템에서 데이터 검색으로 대체하십시오. 리소스 정의에 사용자 지정 속성을 추가한 경우 내부 커넥터 테이블에서 ResourceIngestionDefinitionRepository
및 TemplateWorkItem
에서 사용 가능한 속성을 사용하여 가져올 수 있습니다.
resourceIngestionDefinitionId
ingestionConfigurationId
웹서비스에서 데이터를 검색하는 예는 다음과 같이 보일 수 있습니다.
public final class SourceSystemHttpHelper {
private static final String DATA_URL = "https://source_system.com/data/%s";
private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
private static final ObjectMapper objectMapper = new ObjectMapper();
private static List<Variant> fetchData(String resourceId) {
var response = sourceSystemClient.get(String.format(url, resourceId));
var body = response.body();
try {
return Arrays.stream(objectMapper.readValue(body, Map[].class))
.map(Variant::new)
.collect(Collectors.toList());
} catch (JsonProcessingException e) {
throw new RuntimeException("Cannot parse json", e);
}
}
}
public class SourceSystemHttpClient {
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);
private final HttpClient client;
private final String secret;
public SourceSystemHttpClient() {
this.client = HttpClient.newHttpClient();
this.secret =
SnowflakeSecrets.newInstance()
.getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
}
public HttpResponse<String> get(String url) {
var request =
HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.header("Authorization", format("Bearer %s", secret))
.header("Content-Type", "application/json")
.timeout(REQUEST_TIMEOUT)
.build();
try {
return client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException | InterruptedException ex) {
throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
}
}
}
리소스 수명 주기 관리¶
리소스 생성 및 수집 논리가 구현되면 다음 프로시저를 호출하여 리소스의 수명 주기를 관리할 수 있습니다.
PUBLIC.ENABLE_RESOURCE()
는 특정 리소스를 활성화하여 수집이 예약되도록 합니다.PUBLIC.DISABLE_RESOURCE()
는 특정 리소스를 비활성화하여 해당 리소스의 수집 일정이 중지됨을 의미합니다.PUBLIC.UPDATE_RESOURCE()
를 통해 특정 리소스의 수집 구성을 업데이트할 수 있습니다. 개발자가 커넥터 사용자가 수집 구성을 사용자 지정하도록 허용하는 것이 바람직하지 않을 수 있기 때문에 Streamlit UI 에서는 기본적으로 구현되지 않습니다(이 프로시저의 사용을 완전히 허용하지 않으려면 애플리케이션 역할ADMIN
에 대한 이 프로시저의 권한 부여를 취소).
이러한 모든 프로시저에는 Java 처리기가 있으며, 실행을 사용자 지정할 수 있는 콜백으로 확장됩니다. 이러한 처리기의 빌더를 사용하여 콜백의 사용자 지정 구현을 삽입할 수 있습니다. 기본적으로 템플릿은 콜백 처리기의 모의 Java 구현을 제공합니다. 이러한 모의 구현은 성공 응답을 반환하는 것 외에는 아무 작업도 수행하지 않습니다. 템플릿에서 준비한 콜백 클래스에 사용자 지정 구현을 제공하거나 이러한 콜백을 처음부터 생성하여 처리기 빌더의 기본 프로시저 실행 흐름에 삽입할 수 있습니다.
TemplateEnableResourceHandler.java¶
이 클래스는 전용 콜백으로 확장할 수 있는 PUBLIC.ENABLE_RESOURCE()
프로시저의 처리기입니다.
리소스를 활성화하기 전에 유효성 검사를 수행합니다. 코드에서
TODO: IMPLEMENT ME enable resource validate
설명을 찾아 사용자 지정 구현을 제공하십시오.리소스가 활성화되기 전에 일부 사용자 지정 작업을 수행합니다. 코드에서
TODO: IMPLEMENT ME pre enable resource
설명을 찾아 사용자 지정 구현을 제공하십시오.리소스가 활성화된 후 일부 사용자 지정 작업을 수행합니다. 코드에서
TODO: IMPLEMENT ME post enable resource
설명을 찾아 사용자 지정 구현을 제공하십시오.
PUBLIC.ENABLE_RESOURCE()
프로시저 상세 설명서에 대해 자세히 알아보십시오.
TemplateDisableResourceHandler.java¶
이 클래스는 전용 콜백으로 확장할 수 있는 PUBLIC.DISABLE_RESOURCE()
프로시저의 처리기입니다.
리소스를 비활성화하기 전에 유효성 검사를 수행합니다. 코드에서
TODO: IMPLEMENT ME disable resource validate
설명을 찾아 사용자 지정 구현을 제공하십시오.리소스가 비활성화되기 전에 일부 사용자 지정 작업을 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME pre disable resource
설명을 찾으십시오.
PUBLIC.DISABLE_RESOURCE()
프로시저 상세 설명서에 대해 자세히 알아보십시오.
TemplateUpdateResourceHandler.java¶
이 클래스는 전용 콜백으로 확장할 수 있는 PUBLIC.UPDATE_RESOURCE()
프로시저의 처리기입니다.
리소스를 업데이트하기 전에 유효성 검사를 수행합니다. 코드에서
TODO: IMPLEMENT ME update resource validate
설명을 찾아 사용자 지정 구현을 제공하십시오.리소스가 업데이트되기 전에 일부 사용자 지정 작업을 수행합니다. 코드에서
TODO: IMPLEMENT ME pre update resource
설명을 찾아 사용자 지정 구현을 제공하십시오.리소스가 업데이트된 후 일부 사용자 지정 작업을 수행합니다. 코드에서
TODO: IMPLEMENT ME post update resource
설명을 찾아 사용자 지정 구현을 제공하십시오.
PUBLIC.UPDATE_RESOURCE()
프로시저 상세 설명서에 대해 자세히 알아보십시오.
설정¶
템플릿에는 이전에 설정한 모든 구성을 볼 수 있는 설정 탭이 포함되어 있습니다. 그러나 구성 속성이 사용자 지정된 경우 이 뷰에도 일부 사용자 지정이 필요합니다. 설정 탭 코드는 streamlit/daily_use/settings_page.py
파일에서 찾을 수 있습니다.
이를 사용자 지정하려면 해당 구성에 추가된 키에 대한 구성에서 값을 추출하기만 하면 됩니다. 예를 들어, 앞서 연결 설정 단계에서 additional_connection_property
를 추가했다면 다음과 같은 설정 뷰에 추가되었을 수 있습니다.
def connection_config_page():
current_config = get_connection_configuration()
# TODO: implement the display for all the custom properties defined in the connection configuration step
custom_property = current_config.get("custom_connection_property", "")
additional_connection_property = current_config.get("additional_connection_property", "")
st.header("Connector configuration")
st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
"of the Wizard. If some new property was introduced it has to be added here to display.")
st.divider()
st.text_input(
"Custom connection property:",
value=custom_property,
disabled=True
)
st.text_input(
"Additional connection property:",
value=additional_connection_property,
disabled=True
)
st.divider()