자습서: Native SDK for Connectors Java 템플릿¶
소개¶
Snowflake Native SDK for Connectors 을 활용한 커넥터 템플릿 사용 자습서에 오신 것을 환영합니다. 이 가이드는 단순한 Connector Native Application을 설정하는 데 도움이 됩니다.
이 자습서에서는 다음에 대해 설명합니다.
Connector Native Application 배포
데이터를 수집하기 위한 템플릿 커넥터 구성
사용자의 필요에 맞게 템플릿 커넥터 사용자 지정
템플릿에는 수정이 필요한 특정 파일을 쉽게 찾을 수 있도록 코드에 다양한 유용한 설명이 포함되어 있습니다. 다음 키워드가 포함된 설명을 찾아보십시오. 이러한 설명은 여러분이 커넥터를 직접 구현하는 데 도움이 될 것입니다.
TODO
TODO: HINT
TODO: IMPLEMENT ME
이 자습서를 시작하기 전에 다음 권장 콘텐츠를 검토하여 준비해야 합니다.
전제 조건¶
시작하기 전에 다음 요구 사항을 충족하는지 확인합니다.
Java 11 설치됨
역할이
ACCOUNTADMIN
인 Snowflake 계정에 액세스로컬 머신에
variable_substitution
및exit_on_error
를 사용하여 구성된 SnowSQL(CLI 클라이언트) 도구이 설명서 페이지 커넥터용 Snowflake Native SDK 를 검토하고 온라인에서 열어 두거나 브라우저에서 인쇄하십시오. 빠른 시작 커넥터 네이티브 Java SDK (선택 사항이지만 권장)를 검토하십시오. 이 빠른 시작은 템플릿 기반의 커넥터 예제를 사용하며, 이를 참조하여 다양한 구성 요소의 예제 구현을 확인할 수 있습니다.
초기화 및 배포¶
프로젝트를 초기화하려면 GitHub에서 Native SDK for Connectors 리포지토리 를 복제하고 /templates/native-sdk-connectors-java-template
디렉터리를 원하는 프로젝트 위치로 복사합니다. 이 템플릿에는 동작하는 Connector Native Application을 배포하는 데 필요한 모든 코드가 포함되어 있습니다. 이 작업이 완료되면 템플릿을 배포할 준비가 됩니다.
배포¶
이 템플릿은 바로 배포할 수 있는 상태로 제공되며, 전체 프로세스를 처리하는 편리한 스크립트가 제공됩니다. 커넥터를 배포하기 전에 snowsql
연결을 지정해야 합니다. 이를 위해서는 Makefile
을 열고 연결 이름을 CONNECTION
환경 변수에 입력합니다.
애플리케이션을 빠르게 배포하려면 템플릿의 기본 디렉터리로 이동하여 다음 명령을 실행합니다.
make reinstall_application_from_version_dir
이 명령은 다음과 같은 작업을 수행합니다.
기존에 있던
APPLICATION
및APPLICATION PACKAGE
를 Snowflake 계정에서 제거합니다.SDK jar에서 추출한 jar 및 sql 파일을 대상
sf_build
디렉터리로 복사합니다.애플리케이션의 사용자 지정 Streamlit 및 Java 구성 요소를
sf_build
디렉터리로 복사합니다.Snowflake 계정 내
sf_build
디렉터리에 있는 파일에서 새APPLICATION PACKAGE
를 생성합니다.Snowflake 계정 내에 새
APPLICATION
인스턴스를 생성합니다.
이 과정을 완료하는 데 약 2~3분이 걸립니다. 완료된 후에 Snowflake 내부의 Data Products
-> Apps
탭으로 이동하면 Connector가 표시됩니다. 애플리케이션이 많아 찾기 어려운 경우 검색창에 NATIVE_SDK_CONNECTOR_TEMPLATE
을 입력하거나 사용자 지정 APPLICATION
이름의 경우 사용자 지정 이름을 대신 사용합니다. 이 커넥터를 구성할 준비가 되었습니다. 다음 단계에서는 프로세스를 안내하고 그 과정에서 각 단계를 사용자 지정하는 방법을 설명합니다.
이 자습서의 어느 단계에서든 커넥터를 다시 배포해야 하는 경우(예: 변경 사항을 테스트하기 위해) 위의 명령을 다시 실행하기만 하면 됩니다.
필수 조건 단계¶
배포 직후 커넥터는 마법사 단계입니다. 이 단계는 최종 사용자에게 필요한 모든 구성을 안내하는 몇 가지 단계로 구성됩니다. 첫 번째 단계는 필수 조건 단계입니다. 이는 선택 사항이며 일부 커넥터에 필요하지 않을 수 있습니다. 필수 조건은 일반적으로 워크시트에서 쿼리를 실행하거나 소스 시스템 측에서 일부 구성을 수행하는 등 애플리케이션 외부에서 사용자에게 요구되는 작업입니다.
필수 조건에 대해 자세히 알아보십시오.
각 필수 조건의 내용은 커넥터 내부의 내부 테이블(STATE.PREREQUISITES
)에서 직접 검색할 수 있습니다. 이는 setup.sql
스크립트를 통해 사용자 지정할 수 있습니다. 그러나 setup.sql
스크립트는 애플리케이션을 설치, 업그레이드 및 다운그레이드할 때마다 실행된다는 점에 유의하십시오. 삽입은 멱등적이어야 하므로 아래 예제와 같이 병합 쿼리를 사용하는 것이 좋습니다.
MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
('1',
'Sample prerequisite',
'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
NULL,
NULL,
1
)
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
커넥터 구성 단계¶
마법사 단계의 다음 단계는 커넥터 구성 단계입니다. 이 단계에서는 커넥터에 필요한 데이터베이스 오브젝트와 권한을 구성할 수 있습니다. 이 단계에서는 다음 구성 속성을 지정할 수 있습니다.
warehouse
destination_database
destination_schema
operational_warehouse
global_schedule
data_owner_role
agent_username
agent_role
다른 사용자 지정 속성이 필요한 경우 마법사 단계의 다음 단계 중 하나에서 구성할 수 있습니다. 각 속성에 대한 자세한 내용은 다음을 참조하십시오.
또한 템플릿에 제공된 Streamlit 구성 요소(streamlit/wizard/connector_config.py
)는 permissions-sdk
를 트리거하고 최종 사용자에게 일부 권한 부여를 요청하는 방법을 보여줍니다. 사용 가능한 속성이 커넥터의 요구 사항을 충족하는 한 백엔드 클래스를 덮어쓸 필요는 없지만, 구성의 추가 단계에서 구성 요소와 동일한 방식으로 가능합니다.
내부 프로시저 및 Java 오브젝트에 대한 자세한 내용은 다음을 참조하십시오.
제공된 Streamlit 예제를 통해 create database
및 execute tasks
와 같은 계정 수준 권한 부여를 요청할 수 있습니다. 또한 사용자는 permissions-sdk
팝업을 통해 웨어하우스 참조를 지정할 수 있습니다.
템플릿에서 사용자는 destination_database
및 destination_schema
만 제공하도록 요청을 받습니다. 그러나 streamlit/wizard/connector_configuration.py
의 TODO
설명에는 Streamlit UI에서 더 많은 입력 상자를 표시하는 데 재사용할 수 있는 주석이 달린 코드가 포함되어 있습니다.
# TODO: Here you can add additional fields in connector configuration. Supported values are the following: warehouse, operational_warehouse, data_owner_role, agent_role, agent_username
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
연결 구성 단계¶
마법사 단계의 다음 단계는 연결 구성 단계입니다. 이 단계에서는 최종 사용자가 커넥터에 대한 외부 연결 매개 변수를 구성할 수 있습니다. 이 구성에는 시크릿, 통합 등과 같은 오브젝트 식별자가 포함될 수 있습니다. 이는 커넥터가 수집하는 데이터의 소스 시스템에 따라 달라지기 때문에 원본 코드에서 더 큰 사용자 지정을 해야 하는 첫 번째 부분입니다.
연결 구성에 대한 자세한 내용은 다음을 참조하십시오.
Streamlit UI 쪽(streamlit/wizard/connection_config.py
파일)부터 시작하여 필요한 모든 매개 변수에 대한 텍스트 상자를 추가해야 합니다. 예제 텍스트 상자가 구현되어 있으며 이 파일에서 코드를 검색하면 새 필드에 대한 설명 코드가 있는 TODO
를 찾을 수 있습니다.
# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
속성을 양식에 추가한 후에는 커넥터의 백엔드 계층으로 전달해야 합니다. 이를 위해서는 streamlit 파일에서 두 개의 추가 위치를 수정해야 합니다. 첫 번째는 streamlit/wizard/connection_config.py
파일에 있는 finish_config
함수입니다. 새로 추가된 텍스트 상자의 상태는 여기에서 확인해야 합니다. 또한 필요한 경우 유효성을 검사한 다음 set_connection_configuration
함수에 전달할 수 있습니다. 예를 들어, additional_connection_property
를 추가한 경우 편집 후에는 다음과 같이 표시됩니다.
def finish_config():
try:
# TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
# The properties can also be validated, for example, check whether they are not blank strings etc.
response = set_connection_configuration(
custom_connection_property=st.session_state["custom_connection_property"],
additional_connection_property=st.session_state["additional_connection_property"],
)
# rest of the method without changes
그런 다음 set_connection_configuration
함수를 편집해야 하며 이 함수는 streamlit/native_sdk_api/connection_config.py
파일에서 찾을 수 있습니다. 이 함수는 Streamlit UI와 커넥터의 백엔드 진입점인 기본 SQL 프로시저 사이의 프록시입니다.
def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
# TODO: this part of the code sends the config to the backend so all custom properties need to be added here
config = {
"custom_connection_property": escape_identifier(custom_connection_property),
"additional_connection_property": escape_identifier(additional_connection_property),
}
return call_procedure(
"PUBLIC.SET_CONNECTION_CONFIGURATION",
[variant_argument(config)]
)
이렇게 하면 새 속성이 구성이 포함된 내부 커넥터 테이블에 저장됩니다. 하지만 이것이 가능한 맞춤 설정의 끝은 아닙니다. 일부 백엔드 구성 요소도 사용자 지정이 가능합니다. 해당 구성 요소를 찾으려면 코드에서 다음 설명을 찾아보십시오.
TODO: IMPLEMENT ME connection configuration validate
TODO: IMPLEMENT ME connection callback
TODO: IMPLEMENT ME test connection
유효성 검사 부분에서는 UI에서 받은 데이터에 대한 추가 유효성 검사를 수행할 수 있습니다. 또한 데이터를 소문자로 만들거나, 트리밍하거나, 공급자가 지정한 이름의 오브젝트가 실제로 Snowflake 내에 존재하는지 확인하는 등 데이터를 변환할 수도 있습니다.
연결 콜백은 구성에 따라 추가 작업을 수행할 수 있도록 해주는 부분으로, 예를 들어 외부 액세스 통합을 사용해야 하는 프로시저를 변경하는 등의 작업을 수행할 수 있습니다.
테스트 연결은 연결 구성의 마지막 구성 요소로, 커넥터와 소스 시스템 간에 연결을 설정할 수 있는지 확인합니다.
내부 구성 요소에 대한 자세한 내용은 다음을 참조하십시오.
구현 예제는 다음과 같습니다.
public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {
private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";
@Override
public ConnectorResponse validate(Variant config) {
// TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
// requires some additional validation this is the place to implement this logic.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
if (!integrationCheck.isOk()) {
return integrationCheck;
}
var secretCheck = checkParameter(config, SECRET_PARAM, true);
if (!secretCheck.isOk()) {
return ConnectorResponse.error(ERROR_CODE);
}
return ConnectorResponse.success();
}
}
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {
private final Session session;
public TemplateConnectionConfigurationCallback(Session session) {
this.session = session;
}
@Override
public ConnectorResponse execute(Variant config) {
// TODO: If you need to alter some procedures with external access you can use
// configureProcedure method or implement a similar method on your own.
// TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
// to be done after connection configuration, like altering procedures with external access.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
configureProcedure(format("PUBLIC.TEST_CONNECTION()"), config);
return ConnectorResponse.success();
}
}
public class TemplateConnectionValidator {
private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";
public static Variant testConnection(Session session) {
// TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
// the source system here. This usually requires connection to some webservice or other external
// system. It is suggested to perform only the basic connectivity validation here.
// If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
return test().toVariant();
}
private static ConnectorResponse test() {
try {
var response = SourceSystemHttpHelper.testEndpoint();
if (isSuccessful(response.statusCode())) {
return ConnectorResponse.success();
} else {
return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
}
} catch (Exception exception) {
return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
}
}
}
최종 구성 단계¶
최종 커넥터 구성 단계는 마법사 단계의 마지막 단계입니다. 이 단계에서는 여러 작업을 수행합니다. 첫째, 사용자는 커넥터에 필요한 추가 구성을 지정할 수 있습니다. 두 번째로, 수집된 데이터에 대한 싱크 데이터베이스, 스키마, 그리고 필요한 경우 일부 테이블과 뷰를 생성합니다. 마지막으로 스케줄러와 Task Reactor와 같은 내부 구성 요소를 초기화합니다.
구성 마무리에 대한 자세한 내용은 다음을 참조하십시오.
Task Reactor와 스케줄링에 대한 자세한 내용은 다음을 참조하십시오.
연결 구성 단계와 마찬가지로 Streamlit UI로 사용자 지정을 시작할 수 있습니다. streamlit/wizard/finalize_config.py
에는 예제 속성이 있는 양식이 포함되어 있습니다. 커넥터 요구 사항에 따라 더 많은 속성을 추가할 수 있습니다. 다른 속성을 추가하려면 언급된 파일에서 새 속성을 추가하는 예제 코드가 포함된 TODO
설명을 찾아보십시오.
# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
새로운 속성에 대한 텍스트 상자를 추가한 후에는 백엔드로 전달해야 합니다. 이를 위해서는 같은 파일에서 finalize_configuration
함수를 수정합니다.
def finalize_configuration():
try:
st.session_state["show_main_error"] = False
# TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
response = finalize_connector_configuration(
st.session_state.get("custom_property"),
st.session_state.get("some_additional_property")
)
그런 다음 streamlit/native_sdk_api/finalize_config.py
를 열고 다음 함수에 추가합니다.
def finalize_connector_configuration(custom_property: str, some_additional_property: str):
# TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
config = {
"custom_property": custom_property,
"some_additional_property": some_additional_property,
}
return call_procedure(
"PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
[variant_argument(config)]
)
다시 말해, 연결 구성 단계와 유사하게 이 단계에서도 다양한 백엔드 구성 요소를 사용자 지정할 수 있으며, 이는 코드에서 다음 구문을 사용하여 찾을 수 있습니다.
TODO: IMPLEMENT ME validate source
TODO: IMPLEMENT ME finalize internal
소스 검증 부분은 소스 시스템에서 더욱 정교한 검증을 수행하는 역할을 합니다. 이전 테스트 연결에서 연결이 설정될 수 있는지 여부만 확인한 경우, 검증 소스에서는 시스템의 특정 데이터에 대한 액세스(예: 단일 데이터 레코드 추출)를 확인할 수 있습니다.
최종 내부는 Task Reactor와 스케줄러를 초기화하고 싱크 데이터베이스와 필요한 중첩 오브젝트를 생성하는 내부 프로시저입니다. 또한 최종 단계에서 제공된 구성을 저장하기 위해서도 사용할 수 있습니다(이 구성은 기본적으로 저장되지 않음).
내부 구성 요소에 대한 자세한 내용은 다음에서 확인할 수 있습니다.
또한 FinalizeConnectorInputValidator
인터페이스를 사용하여 입력의 유효성을 검사하고 이를 종료자 핸들러에 제공할 수 있습니다(TemplateFinalizeConnectorConfigurationCustomHandler
확인). 빌더 사용에 대한 자세한 내용은 다음에서 확인할 수 있습니다.
검증 소스의 구현 예는 다음과 같습니다.
public class SourceSystemAccessValidator implements SourceValidator {
@Override
public ConnectorResponse validate(Variant variant) {
// TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
// system. In some cases this can be the same validation that happened in
// TemplateConnectionValidator.
// However, it is suggested to perform more complex validations, like specific access rights to
// some specific resources here.
// See more in docs:
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
// https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
var finalizeProperties = Configuration.fromCustomConfig(variant);
var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
return prepareConnectorResponse(httpResponse.statusCode());
}
private ConnectorResponse prepareConnectorResponse(int statusCode) {
switch (statusCode) {
case 200:
return ConnectorResponse.success();
case 401:
return ConnectorResponse.error("Unauthorized error");
case 404:
return ConnectorResponse.error("Not found error");
default:
return ConnectorResponse.error("Unknown error");
}
}
}
리소스 만들기¶
마법사 단계가 완료되면 커넥터가 데이터 수집을 시작할 준비가 됩니다. 하지만 먼저 리소스를 구현하고 구성해야 합니다. 리소스는 소스 시스템의 특정 데이터 세트(예: 테이블, 엔드포인트, 파일 등)를 설명하는 추상화입니다.
다양한 소스 시스템에는 리소스에 대한 다양한 정보가 필요할 수 있으므로 리소스 정의는 특정 요구 사항에 맞게 사용자 지정되어야 합니다. 이를 위해서는 streamlit/daily_use/data_sync_page.py
파일로 이동합니다. 이 파일에서는 리소스 매개 변수에 대한 텍스트 상자를 추가하는 방법에 대한 TODO
를 찾을 수 있습니다. 리소스 매개 변수는 소스 시스템의 데이터를 식별하고 검색할 수 있도록 해야 합니다. 그런 다음 수집 중에 이러한 매개 변수를 추출할 수 있습니다.
# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
"Resource name",
key="resource_name",
)
st.text_input(
"Some resource parameter",
key="some_resource_parameter"
)
모든 필수 속성이 양식에 추가되면 백엔드로 전달할 수 있습니다. 먼저 텍스트 필드의 상태를 추출하여 streamlit/daily_use/data_sync_page.py
의 API 수준 queue_resource
메서드로 전달해야 합니다.
def queue_resource():
# TODO: add additional properties here and pass them to create_resource function
resource_name = st.session_state.get("resource_name")
some_resource_parameter = st.session_state.get("some_resource_parameter)
if not resource_name:
st.error("Resource name cannot be empty")
return
result = create_resource(resource_name, some_resource_parameter)
if result.is_ok():
st.success("Resource created")
else:
st.error(result.get_message())
그런 다음 streamlit/native_sdk_api/resource_management.py
의 create_resource
함수를 업데이트해야 합니다.
def create_resource(resource_name, some_resource_parameter):
ingestion_config = [{
"id": "ingestionConfig",
"ingestionStrategy": "INCREMENTAL",
# TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
"scheduleType": "INTERVAL",
"scheduleDefinition": "60m"
}]
# TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
resource_id = {
"resource_name": resource_name,
}
id = f"{resource_name}_{random_suffix()}"
# TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
resource_metadata = {
"some_resource_parameter": some_resource_parameter
}
return call_procedure("PUBLIC.CREATE_RESOURCE",
[
varchar_argument(id),
variant_argument(resource_id),
variant_list_argument(ingestion_config),
varchar_argument(id),
"true",
variant_argument(resource_metadata)
])
CREATE_RESOURCE() 프로시저 논리 사용자 지정하기¶
PUBLIC.CREATE_RESOURCE()
프로시저를 통해 개발자는 메인 실행 흐름의 여러 곳에 연결되는 자체 논리를 구현하여 실행을 사용자 지정할 수 있습니다. SDK를 사용하면 개발자는:
리소스를 생성하기 전에 유효성 검사를 수행할 수 있습니다. 논리는
PUBLIC.CREATE_RESOURCE_VALIDATE()
프로시저에서 구현해야 합니다.리소스가 생성되기 전에 일부 사용자 지정 작업을 수행할 수 있습니다. 논리는
PUBLIC.PRE_CREATE_RESOURCE()
프로시저에서 구현해야 합니다.리소스가 생성된 후 일부 사용자 지정 작업을 수행할 수 있습니다. 논리는
PUBLIC.POST_CREATE_RESOURCE()
프로시저에서 구현해야 합니다.
PUBLIC.CREATE_RESOURCE()
프로시저 사용자 지정에 대한 자세한 내용은 여기에서 확인할 수 있습니다.
TemplateCreateResourceHandler.java¶
이 클래스는 PUBLIC.CREATE_RESOURCE()
프로시저의 처리기입니다. 여기에서는 앞서 설명한 콜백 프로시저에 대한 처리기의 Java 구현을 삽입할 수 있습니다. 기본적으로 템플릿은 전체 프로시저 실행 시간을 연장하는 SQL 프로시저 호출을 없애기 위해 콜백 처리기의 모의 Java 구현을 제공합니다. Java 구현으로 실행이 더 빨라집니다. 이러한 모의 구현은 성공 응답을 반환하는 것 외에는 아무 작업도 수행하지 않습니다. 템플릿에서 준비한 콜백 클래스에 사용자 지정 구현을 제공하거나 이러한 콜백을 처음부터 생성하여 처리기 빌더의 기본 프로시저 실행 흐름에 삽입할 수 있습니다.
기본적으로 호출되는 콜백 메서드에 사용자 지정 논리를 구현하려면 코드에서 다음 구문을 찾으십시오.
TODO: IMPLEMENT ME create resource validate
TODO: IMPLEMENT ME pre create resource callback
TODO: IMPLEMENT ME post create resource callback
수집¶
데이터 수집을 수행하려면 소스 시스템과의 연결을 처리하고 리소스 구성에 따라 데이터를 검색하는 클래스를 구현해야 합니다. Scheduler 모듈과 Task Reactor 모듈은 수집 작업의 트리거와 큐에 추가를 처리합니다.
수집 논리는 TemplateIngestion
클래스에서 호출되고, 코드에서 TODO: IMPLEMENT ME ingestion
을 찾아 무작위 데이터 생성을 원본 시스템에서 데이터 검색으로 대체합니다. 리소스 정의에 일부 사용자 지정 속성을 추가한 경우 내부 커넥터 테이블에서 ResourceIngestionDefinitionRepository
및 TemplateWorkItem
에서 사용 가능한 속성을 사용하여 가져올 수 있습니다.
resourceIngestionDefinitionId
ingestionConfigurationId
예를 들어, 일부 웹서비스에서 데이터를 검색하는 예제는 다음과 같습니다.
public final class SourceSystemHttpHelper {
private static final String DATA_URL = "https://source_system.com/data/%s";
private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
private static final ObjectMapper objectMapper = new ObjectMapper();
private static List<Variant> fetchData(String resourceId) {
var response = sourceSystemClient.get(String.format(url, resourceId));
var body = response.body();
try {
return Arrays.stream(objectMapper.readValue(body, Map[].class))
.map(Variant::new)
.collect(Collectors.toList());
} catch (JsonProcessingException e) {
throw new RuntimeException("Cannot parse json", e);
}
}
}
public class SourceSystemHttpClient {
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);
private final HttpClient client;
private final String secret;
public SourceSystemHttpClient() {
this.client = HttpClient.newHttpClient();
this.secret =
SnowflakeSecrets.newInstance()
.getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
}
public HttpResponse<String> get(String url) {
var request =
HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.header("Authorization", format("Bearer %s", secret))
.header("Content-Type", "application/json")
.timeout(REQUEST_TIMEOUT)
.build();
try {
return client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (IOException | InterruptedException ex) {
throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
}
}
}
리소스 수명 주기 관리¶
리소스 생성 및 수집 논리가 구현되면 다음 프로시저에 따라 리소스의 수명 주기를 관리할 수 있습니다.
PUBLIC.ENABLE_RESOURCE()
- 이 프로시저는 특정 리소스를 활성화하여 수집이 예약되도록 합니다.PUBLIC.DISABLE_RESOURCE()
- 이 프로시저는 특정 리소스를 비활성화하여 해당 리소스의 수집 일정이 중지됨을 의미합니다.PUBLIC.UPDATE_RESOURCE()
- 이 프로시저를 통해 특정 리소스의 수집 구성을 업데이트할 수 있습니다. 개발자가 커넥터 사용자가 수집 구성을 사용자 지정하도록 허용하는 것이 바람직하지 않을 수 있기 때문에 Streamlit UI에서는 기본적으로 구현되지 않습니다(이 프로시저의 사용을 완전히 허용하지 않으려면 애플리케이션 역할ACCOUNTADMIN
에 대한 이 프로시저의 권한 부여를 취소).
이러한 모든 프로시저에는 Java 처리기가 있으며, 실행을 사용자 지정할 수 있는 콜백으로 확장됩니다. 이러한 처리기의 빌더를 사용하여 콜백의 사용자 지정 구현을 삽입할 수 있습니다. 기본적으로 템플릿은 설명된 프로시저의 전체 실행 시간을 연장하는 SQL 프로시저 호출을 없애기 위해 콜백 처리기의 모의 Java 구현을 제공합니다. 이러한 모의 구현은 성공 응답을 반환하는 것 외에는 아무 작업도 수행하지 않습니다. 템플릿에서 준비한 콜백 클래스에 사용자 지정 구현을 제공하거나 이러한 콜백을 처음부터 생성하여 처리기 빌더의 기본 프로시저 실행 흐름에 삽입할 수 있습니다.
TemplateEnableResourceHandler.java¶
이 클래스는 전용 콜백으로 확장할 수 있는 PUBLIC.ENABLE_RESOURCE()
프로시저의 처리기입니다.
리소스를 활성화하기 전에 유효성 검사를 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME enable resource validate
구문을 찾습니다.리소스가 활성화되기 전에 일부 사용자 지정 작업을 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME pre enable resource
구문을 찾습니다.리소스가 활성화된 후 일부 사용자 지정 작업을 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME post enable resource
구문을 찾습니다.
PUBLIC.ENABLE_RESOURCE()
프로시저 상세 설명서에 대해 자세히 알아보십시오.
TemplateDisableResourceHandler.java¶
이 클래스는 전용 콜백으로 확장할 수 있는 PUBLIC.DISABLE_RESOURCE()
프로시저의 처리기입니다.
리소스를 비활성화하기 전에 유효성 검사를 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME disable resource validate
구문을 찾습니다.리소스가 비활성화되기 전에 일부 사용자 지정 작업을 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME pre disable resource
구문을 찾습니다.
PUBLIC.DISABLE_RESOURCE()
프로시저 상세 설명서에 대해 자세히 알아보십시오.
TemplateUpdateResourceHandler.java¶
이 클래스는 전용 콜백으로 확장할 수 있는 PUBLIC.UPDATE_RESOURCE()
프로시저의 처리기입니다.
리소스를 업데이트하기 전에 유효성 검사를 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME update resource validate
구문을 찾습니다.리소스가 업데이트되기 전에 일부 사용자 지정 작업을 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME pre update resource
구문을 찾습니다.리소스가 업데이트된 후 일부 사용자 지정 작업을 수행합니다. 사용자 지정 구현을 제공하려면 코드에서
TODO: IMPLEMENT ME post update resource
구문을 찾습니다.
PUBLIC.UPDATE_RESOURCE()
프로시저 상세 설명서에 대해 자세히 알아보십시오.
설정¶
템플릿에는 이전에 설정한 모든 구성을 볼 수 있는 설정 탭이 포함되어 있습니다. 그러나 구성 속성이 사용자 지정된 경우 이 뷰에도 일부 사용자 지정이 필요합니다. 설정 탭 코드는 streamlit/daily_use/settings_page.py
파일에서 찾을 수 있습니다. 이를 사용자 지정하려면 해당 구성에 추가된 키에 대한 구성에서 값을 추출하기만 하면 됩니다.
예를 들어, 앞서 연결 설정 단계에서 additional_connection_property
를 추가했다면 다음과 같이 추가되었을 수 있습니다.
def connection_config_page():
current_config = get_connection_configuration()
# TODO: implement the display for all the custom properties defined in the connection configuration step
custom_property = current_config.get("custom_connection_property", "")
additional_connection_property = current_config.get("additional_connection_property", "")
st.header("Connector configuration")
st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
"of the Wizard. If some new property was introduced it has to be added here to display.")
st.divider()
st.text_input(
"Custom connection property:",
value=custom_property,
disabled=True
)
st.text_input(
"Additional connection property:",
value=additional_connection_property,
disabled=True
)
st.divider()