Tutorial: Modelo de conector Java Snowflake Native SDK for Connectors

Introdução

Bem-vindo ao nosso tutorial sobre como usar um modelo de conector utilizando Snowflake Native SDK for Connectors. Este guia ajudará você a configurar um Connector Native Application.

Neste tutorial você aprenderá a:

  • Implantar um Connector Native Application

  • Configurar um conector de modelo para ingerir dados

  • Personalizar um conector de modelo de acordo com suas próprias necessidades

O modelo contém vários comentários úteis no código para facilitar a localização de arquivos específicos que precisam ser modificados. Procure os comentários com as seguintes palavras-chave, elas irão orientá-lo e ajudar a implementar seu próprio conector:

  • TODO

  • TODO: HINT

  • TODO: IMPLEMENT ME

Antes de começar este tutorial, você deve se preparar revisando o seguinte conteúdo recomendado:

Pré-requisitos

Antes de começar, certifique-se de que você atende aos seguintes requisitos:

Prepare seu ambiente local

Antes de prosseguir, você precisa se certificar de que todo o software necessário está instalado em sua máquina e clonar o modelo de conector.

Instalação do Java

Snowflake Native SDK for Connectors requer o Java LTS (Long-Term Support) versão 11 ou superior. Se a versão mínima exigida do Java não estiver instalada em seu computador, você deverá instalar o Oracle Java ou o OpenJDK.

Oracle Java

A versão mais recente do JDK do LTS é gratuita para download e uso, sem custo, sob o Oracle NFTC. Para obter instruções de download e instalação, acesse a página Oracle.

OpenJDK

OpenJDK é uma implementação de código aberto do Java. Para obter instruções de download e instalação, acesse openjdk.org e jdk.java.net.

Você também pode usar uma versão OpenJDK de terceiros, como Eclipse Temurin ou Amazon Corretto.

Configuração do Snowflake CLI

A ferramenta Snowflake CLI é necessária para criar, implantar e instalar o conector. Se você não tiver o Snowflake CLI em seu computador, instale-o conforme as instruções disponíveis aqui.

Após a instalação da ferramenta, você precisa configurar uma conexão com o Snowflake no arquivo de configuração.

Se você não tiver nenhuma conexão configurada, crie uma nova conexão chamada native_sdk_connection. Você pode encontrar um exemplo de conexão no arquivo deployment/snowflake.toml.

Se você já tiver uma conexão configurada e quiser usá-la com o conector, use o nome dela em vez de native_sdk_connection sempre que essa conexão for usada neste tutorial.

Clonagem de modelos

Para clonar o modelo de conector, use o seguinte comando:

snow init <project_dir> \
  --template-source https://github.com/snowflakedb/connectors-native-sdk \
  --template templates/connectors-native-sdk-template

No lugar de <project_dir>, digite o nome do diretório (ele não deve existir) no qual o projeto Java do seu conector será criado.

Depois de executar o comando, você será solicitado a fornecer informações adicionais para a configuração da instância do aplicativo e do nome do estágio. Você pode fornecer quaisquer nomes, desde que sejam identificadores Snowflake válidos e não citados, ou clicar em enter para usar os valores padrão, que são mostrados entre colchetes.

Um exemplo de execução de comando, fornecendo nomes personalizados de aplicativos e estágios:

$ snow init my_connector \
    --template-source https://github.com/snowflakedb/connectors-native-sdk \
    --template templates/connectors-native-sdk-template

Name of the application instance which will be created in Snowflake [connectors-native-sdk-template]: MY_CONNECTOR
Name of the schema in which the connector files stage will be created [TEST_SCHEMA]:
Name of the stage used to store connector files in the application package [TEST_STAGE]: CUSTOM_STAGE_NAME
Initialized the new project in my_connector

Criação, implementação e limpeza de conectores

O modelo pode ser implementado imediatamente, mesmo antes de qualquer modificação. As seções a seguir mostrarão como criar, implantar e instalar o conector.

Desenvolvimento do conector

O desenvolvimento de um conector criado usando Snowflake Native SDK for Connectors é um pouco diferente do desenvolvimento de um aplicativo Java típico. Há algumas coisas que precisam ser feitas além de apenas criar os arquivos .jar a partir dos códigos-fonte. O desenvolvimento do aplicativo consiste nas seguintes etapas:

  1. Cópia de componentes internos personalizados para o diretório de desenvolvimento

  2. Cópia dos componentes do SDK para o diretório de desenvolvimento

Cópia dos componentes internos

Essa etapa cria o arquivo .jar do conector e o copia (juntamente com os arquivos UI, de manifesto e configuração) para o diretório sf_build.

Para executar essa etapa, execute o comando: ./gradlew copyInternalComponents.

Cópia dos componentes SDK

Esta etapa copia o arquivo SDK .jar (adicionado como uma dependência ao módulo Gradle do conector) para o diretório sf_build e extrai os arquivos .sql agrupados do arquivo .jar.

Esses arquivos .sql permitem a personalização de quais objetos fornecidos serão criados durante a instalação do aplicativo. Para os usuários iniciantes, a personalização não é recomendada, pois a omissão de objetos pode fazer com que alguns recursos falhem se forem feitos incorretamente. O aplicativo do conector de modelos usa o arquivo all.sql, que cria todos os objetos SDK recomendados.

Para executar essa etapa, execute o comando: ./gradlew copySdkComponents.

Implantação do conector

Para implantar um Native App, um pacote de aplicativo precisa ser criado dentro do Snowflake. Depois disso, todos os arquivos do diretório sf_build precisam ser carregados no Snowflake.

Observe que, para fins de desenvolvimento, a criação de versões é opcional; uma instância de aplicativo pode ser criada diretamente a partir de arquivos preparados. Essa abordagem permite que você veja as alterações na maioria dos arquivos do conector sem recriar a versão e a instância do aplicativo.

As seguintes operações serão realizadas:

  1. Criação de um novo pacote de aplicativo, se ele ainda não existir

  2. Criação de um esquema e um estágio de arquivo dentro do pacote

  3. Carregamento dos arquivos do diretório sf_build para o estágio (essa etapa pode levar algum tempo)

Para implantar o conector, execute o comando: snow app deploy --connection=native_sdk_connection.

Para obter mais informações sobre o comando snow app deploy, consulte snow app deploy.

O pacote de aplicativo criado agora estará visível na guia App packages, na categoria Data products, na Snowflake UI de sua conta.

Instalação do conector

A instalação do aplicativo é a última etapa do processo. Ele cria um aplicativo a partir do pacote de aplicativo criado anteriormente.

Para instalar o conector, execute o comando: snow app run --connection=native_sdk_connection.

Para obter mais informações sobre o comando snow app run, consulte snow app run.

O aplicativo instalado agora estará visível na guia Installed apps, na categoria Data products, no Snowflake UI de sua conta.

Atualização dos arquivos do conector

Se, em algum momento, você desejar modificar algum dos arquivos do conector, poderá carregar facilmente os arquivos modificados no estágio do pacote do aplicativo. O comando de upload depende de quais arquivos foram atualizados.

Antes de executar qualquer um dos comandos de atualização, você deverá copiar os novos arquivos do seu conector para o diretório sf_build executando: ./gradlew copyInternalComponents

Arquivos UI .py ou arquivos do conector .java

Use o comando snow app deploy --connection=native_sdk_connection, a instância atual do aplicativo usará os novos arquivos sem reinstalação.

Arquivos setup.sql ou manifest.yml

Use o comando snow app run --connection=native_sdk_connection. A instância atual do aplicativo será reinstalada depois que os novos arquivos forem carregados no estágio.

Limpeza

Depois que o tutorial for concluído ou se, por qualquer motivo, você quiser remover o aplicativo e o respectivo pacote, poderá removê-los completamente da sua conta usando o comando:

snow app teardown --connection=native_sdk_connection --cascade --force

A opção --cascade é necessária para remover o banco de dados de destino sem transferir a propriedade para o administrador da conta. Em conectores reais, o banco de dados não deve ser removido para preservar os dados ingeridos; ele deve ser de propriedade do administrador da conta ou a propriedade deve ser transferida antes da desinstalação.

Observe que o conector consumirá créditos até ser pausado ou removido, mesmo que nenhuma ingestão tenha sido configurada!

Etapa de pré-requisitos

Logo após a instalação, o Connector está em sua fase de assistente. Esta fase consiste em algumas etapas que orientam o usuário final por todas as configurações necessárias.

O primeiro passo é a etapa de pré-requisitos. É opcional e pode não ser necessário para todos os conectores. Os pré-requisitos geralmente são ações exigidas do usuário fora do aplicativo, por exemplo, executar consultas na planilha SQL, fazer a configuração no lado do sistema de origem etc.

Saiba mais sobre os pré-requisitos: Pré-requisitos

O conteúdo de cada pré-requisito é recuperado diretamente da tabela STATE.PREREQUISITES, localizada dentro do conector. Eles podem ser personalizados através do script setup.sql. No entanto, tenha em mente que o script setup.sql é executado em cada instalação, atualização e downgrade do aplicativo. As inserções devem ser idempotentes, por isso é recomendável usar uma consulta de mesclagem, como no exemplo abaixo:

MERGE INTO STATE.PREREQUISITES AS dest
USING (SELECT * FROM VALUES
           ('1',
            'Sample prerequisite',
            'Prerequisites can be used to notice the end user of the connector about external configurations. Read more in the SDK documentation below. This content can be modified inside `setup.sql` script',
            'https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/prerequisites',
            NULL,
            NULL,
            1
           )
) AS src (id, title, description, documentation_url, learnmore_url, guide_url, position)
ON dest.id = src.id
WHEN NOT MATCHED THEN
    INSERT (id, title, description, documentation_url, learnmore_url, guide_url, position)
    VALUES (src.id, src.title, src.description, src.documentation_url, src.learnmore_url, src.guide_url, src.position);
Copy

Etapa de configuração do Connector

A próxima etapa da fase do Assistente é a etapa de configuração do conector. Durante esta etapa, você pode configurar objetos de banco de dados e permissões necessárias para o conector. Esta etapa permite que as seguintes propriedades de configuração sejam especificadas:

  • warehouse

  • operational_warehouse

  • cortex_warehouse

  • destination_database

  • destination_schema

  • global_schedule

  • data_owner_role

  • cortex_user_role

  • agent_username

  • agent_role

Se você precisar de outras propriedades personalizadas, elas poderão ser configuradas em uma das próximas etapas da fase do Assistente. Para mais informações sobre cada uma das propriedades, consulte: Configuração do conector

Além disso, o componente Streamlit (streamlit/wizard/connector_config.py) fornecido no modelo mostra como acionar o Native Apps Permission SDK e solicita concessões de privilégios do usuário final. Desde que as propriedades disponíveis satisfaçam as necessidades do conector, não há necessidade de substituir nenhuma das classes de back-end, embora isso ainda seja possível da mesma forma que para os componentes nas etapas posteriores da configuração.

Para mais informações sobre procedimentos internos e objetos Java, consulte: Referência de configuração do conector

O exemplo de Streamlit fornecido permite solicitar privilégios de nível de conta configurados no arquivo manifest.yml - CREATE DATABASE e EXECUTE TASKS. Ele também permite que o usuário especifique uma referência de warehouse por meio do popup Permission SDK.

No modelo, o usuário é solicitado a fornecer apenas o destination_database e destination_schema. No entanto, um comentário TODO em streamlit/wizard/connector_configuration.py contém código comentado que pode ser reutilizado para exibir mais campos de entrada na Streamlit UI.

# TODO: Here you can add additional fields in connector configuration.
# For example:
st.subheader("Operational warehouse")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="operational_warehouse", label_visibility="collapsed")
st.caption("Name of the operational warehouse to be used")
Copy

Etapa de configuração da conexão

A próxima etapa da fase do Assistente é a etapa de configuração da conexão. Esta etapa permite que o usuário final configure parâmetros de conectividade externa para o conector. Esta configuração pode incluir identificadores de objetos como segredos, integrações etc.

Como essas informações variam de acordo com o sistema de origem dos dados ingeridos pelo conector, esse é o primeiro lugar em que é necessário fazer personalizações maiores no código-fonte.

Para obter mais informações sobre a configuração da conexão, consulte:

Começando pelo lado da Streamlit UI (streamlit/wizard/connection_config.py), você precisa adicionar entradas de texto para todos os parâmetros necessários. Um exemplo de entrada de texto foi implementado para você e, se pesquisar o código nesse arquivo, poderá encontrar um TODO com código comentado para um novo campo.

# TODO: Additional configuration properties can be added to the UI like this:
st.subheader("Additional connection parameter")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="additional_connection_property", label_visibility="collapsed")
st.caption("Some description of the additional property")
Copy

Depois que as propriedades são adicionadas ao formulário, elas precisam ser passadas para a camada de backend do conector. Para fazer isso, dois locais adicionais devem ser modificados nos arquivos Streamlit. O primeiro é a função finish_config no arquivo streamlit/wizard/connection_config.py. O estado das entradas de texto recém-adicionadas deve ser lido aqui. Além disso, ele pode ser validado se necessário e, em seguida, passado para a função set_connection_configuration.

Por exemplo se additional_connection_property fosse adicionado, ficaria assim após as edições:

def finish_config():
try:
    # TODO: If some additional properties were specified they need to be passed to the set_connection_configuration function.
    # The properties can also be validated, for example, check whether they are not blank strings etc.
    response = set_connection_configuration(
        custom_connection_property=st.session_state["custom_connection_property"],
        additional_connection_property=st.session_state["additional_connection_property"],
    )

# rest of the method without changes
Copy

Depois a função set_connection_configuration deve ser editada, ela pode ser encontrada no arquivo streamlit/native_sdk_api/connection_config.py. Esta função é um proxy entre a UI do Streamlit e o procedimento SQL subjacente, que é um ponto de entrada para o backend do conector.

def set_connection_configuration(custom_connection_property: str, additional_connection_property: str):
    # TODO: this part of the code sends the config to the backend so all custom properties need to be added here
    config = {
        "custom_connection_property": escape_identifier(custom_connection_property),
        "additional_connection_property": escape_identifier(additional_connection_property),
    }

    return call_procedure(
        "PUBLIC.SET_CONNECTION_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Depois de fazer isso, a nova propriedade é salva na tabela do conector interno, que contém a configuração. No entanto, esse não é o fim das personalizações possíveis. Alguns componentes de backend também podem ser personalizados. Procure os seguintes comentários no código para encontrá-los:

  • TODO: IMPLEMENT ME connection configuration validate

  • TODO: IMPLEMENT ME connection callback

  • TODO: IMPLEMENT ME test connection

A parte de validação permite qualquer validação adicional nos dados recebidos da UI. Ele também pode transformar os dados, por exemplo, alterar a letra maiúscula ou minúscula dos caracteres, cortar os dados fornecidos ou verificar se os objetos com os nomes fornecidos realmente existem no Snowflake.

O retorno de chamada de conexão é uma parte que permite que você execute qualquer operação adicional com base na configuração, por exemplo, alterar procedimentos que precisam usar integrações de acesso externo, usando uma solução descrita em Referência de configuração de integração externa.

O teste de conexão é o componente final da configuração de conexão e verifica se a conexão pode ser estabelecida entre o conector e o sistema de origem.

Para mais informações sobre esses componentes internos, consulte:

Exemplos de implementações podem ser assim:

public class TemplateConfigurationInputValidator implements ConnectionConfigurationInputValidator {

    private static final String ERROR_CODE = "INVALID_CONNECTION_CONFIGURATION";

    @Override
    public ConnectorResponse validate(Variant config) {
      // TODO: IMPLEMENT ME connection configuration validate: If the connection configuration input
      // requires some additional validation this is the place to implement this logic.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var integrationCheck = checkParameter(config, INTEGRATION_PARAM, false);
      if (!integrationCheck.isOk()) {
        return integrationCheck;
      }

      var secretCheck = checkParameter(config, SECRET_PARAM, true);
      if (!secretCheck.isOk()) {
        return ConnectorResponse.error(ERROR_CODE);
      }

      return ConnectorResponse.success();
    }
}
Copy
public class TemplateConnectionConfigurationCallback implements ConnectionConfigurationCallback {

    private static final String[] EXTERNAL_SOURCE_PROCEDURE_SIGNATURES = {
        asVarchar(format("%s.%s()", PUBLIC_SCHEMA, TEST_CONNECTION_PROCEDURE)),
        asVarchar(format("%s.%s(VARIANT)", PUBLIC_SCHEMA, FINALIZE_CONNECTOR_CONFIGURATION_PROCEDURE)),
        asVarchar(format("%s.%s(NUMBER, STRING)", PUBLIC_SCHEMA, WORKER_PROCEDURE))
      };

    private final Session session;

    public TemplateConnectionConfigurationCallback(Session session) {
      this.session = session;
    }

    @Override
    public ConnectorResponse execute(Variant config) {
      // TODO: If you need to alter some procedures with external access you can use
      // configureProcedure method or implement a similar method on your own.
      // TODO: IMPLEMENT ME connection callback: Implement the custom logic of changes in application
      // to be done after connection configuration, like altering procedures with external access.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      var response = configureProceduresWithReferences();
      if (response.isNotOk()) {
         return response;
      }
      return ConnectorResponse.success();
    }

    private ConnectorResponse configureProceduresWithReferences() {
      return callProcedure(
        session,
        PUBLIC_SCHEMA,
        SETUP_EXTERNAL_INTEGRATION_WITH_NAMES_PROCEDURE,
        EXTERNAL_SOURCE_PROCEDURE_SIGNATURES);
    }
}
Copy
public class TemplateConnectionValidator {

    private static final String ERROR_CODE = "TEST_CONNECTION_FAILED";

    public static Variant testConnection(Session session) {
      // TODO: IMPLEMENT ME test connection: Implement the custom logic of testing the connection to
      // the source system here. This usually requires connection to some webservice or other external
      // system. It is suggested to perform only the basic connectivity validation here.
      // If that's the case then this procedure must be altered in TemplateConnectionConfigurationCallback first.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/connection_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/connection_configuration
      return test().toVariant();
    }

    private static ConnectorResponse test() {
      try {
        var response = SourceSystemHttpHelper.testEndpoint();

        if (isSuccessful(response.statusCode())) {
          return ConnectorResponse.success();
        } else {
          return ConnectorResponse.error(ERROR_CODE, "Connection to source system failed");
        }
      } catch (Exception exception) {
        return ConnectorResponse.error(ERROR_CODE, "Test connection failed");
      }
    }
}
Copy

Etapa de finalização de configuração

A etapa de finalização da configuração do conector é a etapa final da fase do assistente. Esta etapa tem múltiplas responsabilidades:

  1. Permite que o usuário especifique qualquer configuração adicional necessária para o conector

  2. Cria o banco de dados do coletor, o esquema e as tabelas e exibições adicionais para os dados ingeridos, se necessário

  3. Inicializa os componentes internos, como o agendador e o reator de tarefas

Para obter mais informações sobre a finalização da configuração, consulte:

Para obter mais informações sobre o reator de tarefas e a programação, consulte:

Da mesma forma que a etapa de configuração da conexão, a personalização pode ser iniciada com o Streamlit UI. O arquivo streamlit/wizard/finalize_config.py contém um formulário com um exemplo de propriedade. Mais propriedades podem ser adicionadas de acordo com as necessidades do conector. Para adicionar outra propriedade, procure por um comentário TODO, que contém código de exemplo de adição de uma nova propriedade no arquivo mencionado.

# TODO: Here you can add additional fields in finalize connector configuration.
# For example:
st.subheader("Some additional property")
input_col, _ = st.columns([2, 1])
with input_col:
    st.text_input("", key="some_additional_property", label_visibility="collapsed")
st.caption("Description of some new additional property")
Copy

Depois de adicionar a entrada de texto para uma nova propriedade, ela precisa ser passada para o backend. Para fazer isso, modifique a função finalize_configuration no mesmo arquivo:

def finalize_configuration():
    try:
        st.session_state["show_main_error"] = False
        # TODO: If some additional properties were introduced, they need to be passed to the finalize_connector_configuration function.
        response = finalize_connector_configuration(
            st.session_state.get("custom_property"),
            st.session_state.get("some_additional_property")
        )
Copy

Em seguida, abra o arquivo streamlit/native_sdk_api/finalize_config.py e adicione a nova propriedade à seguinte função:

def finalize_connector_configuration(custom_property: str, some_additional_property: str):
    # TODO: If some custom properties were configured, then they need to be specified here and passed to the FINALIZE_CONNECTOR_CONFIGURATION procedure.
    config = {
        "custom_property": custom_property,
        "some_additional_property": some_additional_property,
    }
    return call_procedure(
        "PUBLIC.FINALIZE_CONNECTOR_CONFIGURATION",
        [variant_argument(config)]
    )
Copy

Novamente, de forma semelhante à etapa de configuração da conexão, essa etapa também permite a personalização de vários componentes de backend, que podem ser encontrados usando os seguintes comentários no código-fonte:

  • TODO: IMPLEMENT ME validate source

  • TODO: IMPLEMENT ME finalize internal

A parte de validação da fonte é responsável por executar validações mais sofisticadas nos sistemas de origem. Se o teste de conexão anterior verificou apenas se uma conexão pode ser estabelecida, a fonte de validação poderia verificar o acesso a dados específicos no sistema, por exemplo, extraindo um único registro de dados.

A finalização interna é um procedimento interno responsável por inicializar o reator e o agendador de tarefas, criando um banco de dados de recebimento e os objetos aninhados necessários. Ele também pode ser usado para salvar a configuração fornecida durante a etapa de finalização (essa configuração não é salva por padrão).

Mais informações sobre os componentes internos podem ser encontradas em:

Além disso, a entrada pode ser validada usando a interface FinalizeConnectorInputValidator e fornecendo-a ao manipulador de finalização - verifique o arquivo TemplateFinalizeConnectorConfigurationCustomHandler. Mais informações sobre o uso de construtores podem ser encontradas em: Customização de procedimentos armazenados e manipuladores.

Um exemplo de implementação da fonte de validação pode ser parecido com este:

public class SourceSystemAccessValidator implements SourceValidator {

    @Override
    public ConnectorResponse validate(Variant variant) {
      // TODO: IMPLEMENT ME validate source: Implement the custom logic of validating the source
      // system. In some cases this can be the same validation that happened in
      // TemplateConnectionValidator.
      // However, it is suggested to perform more complex validations, like specific access rights to
      // some specific resources here.
      // See more in docs:
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/reference/finalize_configuration_reference
      // https://docs.snowflake.com/developer-guide/native-apps/connector-sdk/flow/finalize_configuration
      var finalizeProperties = Configuration.fromCustomConfig(variant);

      var httpResponse = SourceSystemHttpHelper.validateSource(finalizeProperties.get("custom_property"));
      return prepareConnectorResponse(httpResponse.statusCode());
    }

    private ConnectorResponse prepareConnectorResponse(int statusCode) {
      switch (statusCode) {
        case 200:
          return ConnectorResponse.success();
        case 401:
          return ConnectorResponse.error("Unauthorized error");
        case 404:
          return ConnectorResponse.error("Not found error");
        default:
          return ConnectorResponse.error("Unknown error");
      }
    }
}
Copy

Criação de recursos

Após a conclusão da fase do assistente, o conector estará pronto para começar a ingerir dados. Mas, primeiro, os recursos devem ser implementados e configurados. Um recurso é uma abstração que descreve um conjunto específico de dados no sistema de origem, por exemplo, uma tabela, um ponto de extremidade, um arquivo etc.

Sistemas de origem diferentes podem precisar de informações diferentes sobre um recurso. Por esse motivo, uma definição de recurso precisa ser personalizada de acordo com as necessidades específicas. Para isso, vá até o arquivo streamlit/daily_use/data_sync_page.py. Lá você encontra um comentário TODO sobre a adição de entradas de texto para parâmetros de recursos. Os parâmetros de recursos devem permitir a identificação e recuperação de dados do sistema de origem. Esses parâmetros podem ser extraídos durante a ingestão.

# TODO: specify all the properties needed to define a resource in the source system. A subset of those properties should allow for a identification of a single resource, be it a table, endpoint, repository or some other data storage abstraction
st.text_input(
    "Resource name",
    key="resource_name",
)
st.text_input(
    "Some resource parameter",
    key="some_resource_parameter"
)
Copy

Depois que todas as propriedades necessárias forem adicionadas ao formulário, elas poderão ser passadas para o backend. Primeiro, o estado dos campos de texto deve ser extraído e passado para o método queue_resource do nível de API no arquivo streamlit/daily_use/data_sync_page.py:

def queue_resource():
    # TODO: add additional properties here and pass them to create_resource function
    resource_name = st.session_state.get("resource_name")
    some_resource_parameter = st.session_state.get("some_resource_parameter")

    if not resource_name:
        st.error("Resource name cannot be empty")
        return

    result = create_resource(resource_name, some_resource_parameter)
    if result.is_ok():
        st.success("Resource created")
    else:
        st.error(result.get_message())
Copy

Em seguida, a função create_resource do arquivo streamlit/native_sdk_api/resource_management.py precisa ser atualizada:

def create_resource(resource_name, some_resource_parameter):
    ingestion_config = [{
        "id": "ingestionConfig",
        "ingestionStrategy": "INCREMENTAL",
        # TODO: HINT: scheduleType and scheduleDefinition are currently not supported out of the box, due to globalSchedule being used. However, a custom implementation of the scheduler can use those fields. They need to be provided becuase they are mandatory in the resourceDefinition.
        "scheduleType": "INTERVAL",
        "scheduleDefinition": "60m"
    }]
    # TODO: HINT: resource_id should allow identification of a table, endpoint etc. in the source system. It should be unique.
    resource_id = {
        "resource_name": resource_name,
    }
    id = f"{resource_name}_{random_suffix()}"

    # TODO: if you specified some additional resource parameters then you need to put them inside resource metadata:
    resource_metadata = {
        "some_resource_parameter": some_resource_parameter
    }

    return call_procedure("PUBLIC.CREATE_RESOURCE",
                          [
                              varchar_argument(id),
                              variant_argument(resource_id),
                              variant_list_argument(ingestion_config),
                              varchar_argument(id),
                              "true",
                              variant_argument(resource_metadata)
                          ])
Copy

Personalização da lógica do procedimento CREATE_RESOURCE()

O procedimento PUBLIC.CREATE_RESOURCE() permite que o desenvolvedor personalize sua execução implementando lógica personalizada que é conectada a vários locais do fluxo de execução principal. O SDK permite que o desenvolvedor:

  1. Valide o recurso antes de criá-lo. A lógica deve ser implementada no procedimento PUBLIC.CREATE_RESOURCE_VALIDATE().

  2. Realize operações personalizadas antes de o recurso ser criado. A lógica deve ser implementada no procedimento PUBLIC.PRE_CREATE_RESOURCE().

  3. Realize operações personalizadas depois que o recurso for criado. A lógica deve ser implementada no procedimento PUBLIC.POST_CREATE_RESOURCE().

Mais informações sobre a personalização do procedimento PUBLIC.CREATE_RESOURCE() podem ser encontradas aqui:

TemplateCreateResourceHandler.java

Esta classe é um manipulador para o procedimento PUBLIC.CREATE_RESOURCE(). Aqui, é possível injetar as implementações Java de manipuladores para os procedimentos de chamada de retorno mencionados anteriormente. Por padrão, o modelo fornece implementações Java simuladas de manipuladores de retorno de chamada para evitar a chamada de procedimentos SQL, o que aumentaria o tempo de execução do procedimento - as implementações Java tornam a execução mais rápida. Essas implementações simuladas não fazem nada além de retornar uma resposta de sucesso. É possível fornecer a implementação personalizada para as classes de retorno de chamada preparadas pelo modelo ou criar esses retornos de chamada do zero e injetá-los no fluxo de execução do procedimento principal no construtor do manipulador.

Para implementar a lógica personalizada dos métodos de retorno de chamada que são chamados por padrão, procure os seguintes comentários no código:

  • TODO: IMPLEMENT ME create resource validate

  • TODO: IMPLEMENT ME pre create resource callback

  • TODO: IMPLEMENT ME post create resource callback

Ingestão

Para realizar a ingestão de dados, você precisa implementar uma classe que manipulará a conexão com o sistema de origem e recuperará os dados, com base na configuração do recurso. Os módulos Scheduler e Task Reactor cuidarão do acionamento e enfileiramento de tarefas de ingestão.

A lógica de ingestão é invocada a partir da classe TemplateIngestion. Procure o comentário TODO: IMPLEMENT ME ingestion no código e substitua a geração de dados aleatórios pela recuperação de dados do sistema de origem. Se você adicionou propriedades personalizadas à definição do recurso, elas podem ser obtidas nas tabelas de conectores internos usando o ResourceIngestionDefinitionRepository e as propriedades disponíveis no TemplateWorkItem:

  • resourceIngestionDefinitionId

  • ingestionConfigurationId

O exemplo de recuperação de dados de um serviço da Web pode ter a seguinte aparência:

public final class SourceSystemHttpHelper {

  private static final String DATA_URL = "https://source_system.com/data/%s";
  private static final SourceSystemHttpClient sourceSystemClient = new SourceSystemHttpClient();
  private static final ObjectMapper objectMapper = new ObjectMapper();

  private static List<Variant> fetchData(String resourceId) {
    var response = sourceSystemClient.get(String.format(url, resourceId));
    var body = response.body();

    try {
        return Arrays.stream(objectMapper.readValue(body, Map[].class))
              .map(Variant::new)
              .collect(Collectors.toList());
    } catch (JsonProcessingException e) {
      throw new RuntimeException("Cannot parse json", e);
    }
  }
}
Copy
public class SourceSystemHttpClient {

  private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(15);

  private final HttpClient client;
  private final String secret;

  public SourceSystemHttpClient() {
    this.client = HttpClient.newHttpClient();
    this.secret =
        SnowflakeSecrets.newInstance()
            .getGenericSecretString(ConnectionConfiguration.TOKEN_NAME);
  }

  public HttpResponse<String> get(String url) {
    var request =
        HttpRequest.newBuilder()
            .uri(URI.create(url))
            .GET()
            .header("Authorization", format("Bearer %s", secret))
            .header("Content-Type", "application/json")
            .timeout(REQUEST_TIMEOUT)
            .build();

    try {
      return client.send(request, HttpResponse.BodyHandlers.ofString());
    } catch (IOException | InterruptedException ex) {
      throw new RuntimeException(format("HttpRequest failed: %s", ex.getMessage()), ex);
    }
  }
}
Copy

Gerenciamento do ciclo de vida dos recursos

Uma vez implementada a lógica de criação de recursos e sua ingestão, você pode gerenciar seu ciclo de vida chamando os procedimentos a seguir:

  1. PUBLIC.ENABLE_RESOURCE() habilita um determinado recurso, o que significa que ele será agendado para ingestão

  2. PUBLIC.DISABLE_RESOURCE() desativa um determinado recurso, o que significa que sua programação de ingestão será interrompida

  3. PUBLIC.UPDATE_RESOURCE() permite que você atualize as configurações de ingestão de um determinado recurso. Ele não é implementado na UI do Streamlit por padrão porque às vezes pode ser indesejado pelo desenvolvedor permitir que o usuário do conector personalize a configuração de ingestão (revogue as concessões neste procedimento para a função de aplicativo ADMIN para proibir seu uso completamente).

Todos esses procedimentos têm manipuladores Java e são estendidos com retornos de chamada que permitem personalizar sua execução. Você pode injetar implementações personalizadas de retornos de chamada usando os construtores para esses manipuladores. Por padrão, o modelo fornece implementações Java simuladas de manipuladores de callback. Essas implementações simuladas não fazem nada além de retornar uma resposta de sucesso. É possível fornecer a implementação personalizada para as classes de retorno de chamada preparadas pelo modelo ou criar esses retornos de chamada do zero e injetá-los no fluxo de execução do procedimento principal nos construtores de manipulador.

TemplateEnableResourceHandler.java

Esta classe é um manipulador para o procedimento PUBLIC.ENABLE_RESOURCE(), que pode ser estendido com os retornos de chamada dedicados a:

  1. Validar o recurso antes de habilitá-lo. Procure o comentário TODO: IMPLEMENT ME enable resource validate no código para fornecer a implementação personalizada.

  2. Executar operações personalizadas antes que o recurso seja ativado. Procure o comentário TODO: IMPLEMENT ME pre enable resource no código para fornecer a implementação personalizada.

  3. Realizar operações personalizadas depois que o recurso for ativado. Procure o comentário TODO: IMPLEMENT ME post enable resource no código para fornecer a implementação personalizada.

Saiba mais na documentação detalhada do procedimento PUBLIC.ENABLE_RESOURCE():

TemplateDisableResourceHandler.java

Esta classe é um manipulador para o procedimento PUBLIC.DISABLE_RESOURCE(), que pode ser estendido com os retornos de chamada dedicados a:

  1. Validar o recurso antes que ele seja desabilitado. Procure o comentário TODO: IMPLEMENT ME disable resource validate no código para fornecer a implementação personalizada.

  2. Realizar operações personalizadas antes que o recurso seja desativado. Procure o comentário TODO: IMPLEMENT ME pre disable resource no código para fornecer a implementação personalizada.

Saiba mais na documentação detalhada do procedimento PUBLIC.DISABLE_RESOURCE():

TemplateUpdateResourceHandler.java

Esta classe é um manipulador para o procedimento PUBLIC.UPDATE_RESOURCE(), que pode ser estendido com os retornos de chamada dedicados a:

  1. Validar o recurso antes de atualizá-lo. Procure o comentário TODO: IMPLEMENT ME update resource validate no código para fornecer a implementação personalizada.

  2. Executar operações personalizadas antes que o recurso seja atualizado. Procure o comentário TODO: IMPLEMENT ME pre update resource no código para fornecer a implementação personalizada.

  3. Executar operações personalizadas depois que o recurso for atualizado. Procure o comentário TODO: IMPLEMENT ME post update resource no código para fornecer a implementação personalizada.

Saiba mais na documentação detalhada do procedimento PUBLIC.UPDATE_RESOURCE():

Configurações

O modelo contém uma aba de configurações que permite visualizar todas as configurações feitas anteriormente. No entanto, se as propriedades de configuração foram personalizadas, essa exibição também precisará de algumas personalizações. O código da aba Configurações pode ser encontrado no arquivo streamlit/daily_use/settings_page.py.

Para personalizá-lo, basta extrair os valores da configuração das chaves que foram adicionadas nas respectivas configurações. Por exemplo, se o additional_connection_property tiver sido adicionado anteriormente na etapa de configuração da conexão, ele poderá ser adicionado na visualização de configurações da seguinte forma:

def connection_config_page():
    current_config = get_connection_configuration()

    # TODO: implement the display for all the custom properties defined in the connection configuration step
    custom_property = current_config.get("custom_connection_property", "")
    additional_connection_property = current_config.get("additional_connection_property", "")


    st.header("Connector configuration")
    st.caption("Here you can see the connector connection configuration saved during the connection configuration step "
               "of the Wizard. If some new property was introduced it has to be added here to display.")
    st.divider()

    st.text_input(
        "Custom connection property:",
        value=custom_property,
        disabled=True
    )
    st.text_input(
        "Additional connection property:",
        value=additional_connection_property,
        disabled=True
    )
    st.divider()
Copy