Configure o Openflow Connector for SQL Server

Nota

O conector está sujeito aos Termos do conector Snowflake.

Este tópico descreve como configurar o Openflow Connector for SQL Server.

Para obter informações sobre o processo de carregamento incremental, consulte Replicação incremental.

Pré-requisitos

Antes de configurar o conector, você precisa concluir os seguintes pré-requisitos:

  1. Certifique-se de ter revisado Sobre a Openflow Connector for SQL Server.

  2. Certifique-se de ter revisado Versões compatíveis do servidor SQL.

  3. Certifique-se de ter configurado a implantação do seu tempo de execução. Para obter mais informações, consulte os seguintes tópicos:

  4. Se você usa Openflow - Snowflake Deployments, certifique-se de ter revisado a configuração dos domínios necessários e concedido acesso aos domínios necessários para o conector SQL Server.

Configurar a instância do servidor SQL

Antes de configurar o conector, execute as seguintes tarefas no ambiente do servidor SQL:

Nota

Você deve executá-las como administrador de banco de dados.

  1. Habilite o rastreamento de alterações nos bancos de dados e nas tabelas que você planeja replicar, conforme mostrado no seguinte exemplo de servidor SQL:

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
    
    Copy

    Nota

    Execute esses comandos para cada banco de dados e tabela que você planeja replicar.

    O conector exige que o rastreamento de alterações esteja habilitado nos bancos de dados e nas tabelas antes do início da replicação. Certifique-se de que cada tabela que você planeja replicar tenha habilitado o rastreamento de alterações. Você também pode habilitar o rastreamento de alterações em tabelas adicionais enquanto o conector está em execução.

  2. Crie um login para a instância do servidor SQL:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    Esse login é usado para criar usuários para os bancos de dados que você planeja replicar.

  3. Crie um usuário para cada banco de dados que você está replicando executando o seguinte comando do servidor SQL em cada banco de dados:

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. Conceda as permissões SELECT e VIEW CHANGE TRACKING ao usuário para cada banco de dados que você está replicando:

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    Execute esses comandos em cada banco de dados para cada tabela que você planeja replicar. Essas permissões devem ser concedidas ao usuário de cada banco de dados que você criou em uma etapa anterior.

  5. (Opcional) Configure a conexão SSL.

    Se você usa uma conexão SSL para conectar o servidor SQL, crie o certificado raiz para o servidor de banco de dados. Isso é necessário ao configurar o conector.

Configure seu ambiente Snowflake

Como administrador do Snowflake, execute as seguintes tarefas:

  1. Crie um banco de dados de destino no Snowflake para armazenar os dados replicados:

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Crie um usuário de serviço do Snowflake:

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. Crie uma função do Snowflake para o conector e conceda os privilégios necessários:

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    Use essa função para gerenciar o acesso do conector ao banco de dados Snowflake.

    Para criar objetos no banco de dados de destino, você deve conceder os privilégios USAGE e CREATE SCHEMA no banco de dados para a função usada para gerenciar o acesso.

  4. Crie um warehouse Snowflake para o conector e conceda os privilégios necessários:

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'XSMALL'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflake recommends starting with a XSMALL warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Configure as chaves públicas e privadas para autenticação do par de chaves:

    1. Crie um par de chaves seguras (pública e privada).

    2. Armazene a chave privada do usuário em um arquivo para fornecer à configuração do conector.

    3. Atribua a chave pública ao usuário do serviço Snowflake:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Para obter mais informações, consulte Autenticação de pares de chaves e rotação de pares de chaves.

Configuração do conector

Como engenheiro de dados, instale e configure o conector seguindo as seções abaixo.

Instalação do conector

  1. Navegue até a página de visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes e clique em Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

Para configurar o conector, execute as seguintes etapas:

  1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  2. Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.

Parâmetros de fluxo

Comece definindo os parâmetros do contexto de parâmetros de origem do SQLServer e, em seguida, do contexto de parâmetros de destino do SQLServer. Depois de concluir esse procedimento, habilite o conector. O conector se conecta a ambos SQLServer e Snowflake e começa a ser executado. No entanto, o conector não replica nenhum dado até que as tabelas a serem replicadas sejam explicitamente adicionadas à sua configuração.

Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do SQLServer. Depois que você aplicar as alterações ao contexto Parâmetros de ingestão do SQLServer, a configuração será captada pelo conector e o ciclo de vida da replicação será iniciado para cada tabela.

Contexto dos parâmetros de origem do SQLServer

Parâmetro

Descrição

URL de conexão do SQL Server

O URL completo do JDBC para o banco de dados de origem.

Exemplo:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

Driver JDBC do SQL Server

Marque a caixa de seleção Reference asset para carregar o driver JDBC do servidor SQL.

Nome de usuário do servidor SQL

O nome de usuário do conector.

Senha do servidor SQL

A senha do conector.

Contexto dos parâmetros de destino do SQLServer

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

O banco de dados em que os dados persistem. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas.

Sim

Estratégia de autenticação Snowflake

Ao utilizar:

  • Snowflake Openflow Deployment or BYOC: Use SNOWFLAKE_MANAGED_TOKEN. This token is managed automatically by Snowflake. BYOC deployments must have previously configured runtime roles to use SNOWFLAKE_MANAGED_TOKEN.

  • BYOC: o BYOC também pode usar KEY_PAIR como valor da estratégia de autenticação.

Sim

Identificador de conta Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: nome da conta Snowflake em que os dados persistem, formatado como [nome-da-organização]-[nome-da-conta].

Sim

Estratégia de conexão com o Snowflake

Ao usar KEY_PAIR, especifique a estratégia para conexão com o Snowflake:

  • STANDARD (padrão): conecte-se aos serviços Snowflake usando o roteamento público padrão.

  • PRIVATE_CONNECTIVITY: conecte-se usando endereços privados associados à plataforma de nuvem de suporte, como AWS PrivateLink.

Necessário somente para BYOC com KEY_PAIR; caso contrário, será ignorado.

Resolução de identificador do objeto Snowflake

Especifica como os identificadores de objetos de origem, como esquemas, tabelas e nomes de colunas, são armazenados e consultados no Snowflake. Essa configuração determina se você precisará usar aspas duplas nas consultas SQL.

Opção 1: padrão, sem distinção entre maiúsculas e minúsculas (recomendado).

  • Transformação: Todos os identificadores são convertidos em maiúsculas. Por exemplo, My_Table torna-se MY_TABLE.

  • Consultas: as consultas SQL não diferenciam maiúsculas de minúsculas e não exigem as aspas duplas doSQL.

    Por exemplo SELECT * FROM my_table; retorna os mesmos resultados que SELECT * FROM MY_TABLE;.

Nota

A Snowflake recomenda usar esta opção se não for esperado que os objetos de banco de dados tenham nomes que misturem letras maiúsculas e minúsculas.

Importante

Não altere esta configuração após o início da ingestão do conector. Se esta configuração for alterada após o início da ingestão, ela será interrompida. Se você precisar alterar essa configuração, crie uma nova instância do conector.

Opção 2: com distinção entre maiúsculas e minúsculas.

  • Transformação: As letras maiúsculas/minúsculas são preservadas. Por exemplo, My_Table continua sendo My_Table.

  • Consultas: consultas SQL devem usar aspas duplas para corresponder exatamente ao uso de maiúsculas/minúsculas dos objetos de banco de dados. Por exemplo, SELECT * FROM "My_Table";.

Nota

A Snowflake recomenda usar essa opção se for necessário preservar o uso de maiúsculas/minúsculas na origem por razões herdadas ou de compatibilidade. Por exemplo, se o banco de dados de origem inclui nomes de tabelas que são diferentes apenas no uso de maiúsculas e minúsculas, como MY_TABLE e my_table, isso vai gerar um conflito de nomes ao usar comparações que não diferenciam maiúsculas de minúsculas.

Sim

Chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: deve ser a chave privada RSA utilizada para a autenticação.

    A chave RSA deve ser formatada de acordo com os padrões PKCS8 e têm os cabeçalhos e rodapés PEM padrão. Observe que é necessário definir o arquivo de chave privada do Snowflake ou a chave privada do Snowflake.

Não

Arquivo de chave privada Snowflake

Ao utilizar:

  • Estratégia de autenticação de token de sessão: o arquivo de chave privada deve estar em branco.

  • KEY_PAIR: carregue o arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e incluindo cabeçalhos e rodapés PEM padrão. A linha do cabeçalho começa com -----BEGIN PRIVATE. Para carregar o arquivo de chave privada, marque a caixa de seleção Reference asset.

Não

Senha de chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça a senha associada ao arquivo de chave privada do Snowflake.

Não

Função Snowflake

Ao utilizar:

  • Estratégia de autenticação de tokens de sessão: use sua função de tempo de execução. Você pode encontrar sua função de tempo de execução na UI do Openflow, navegando para View Details no seu tempo de execução.

  • Estratégia de autenticação de KEY_PAIR: use uma função válida configurada para o usuário do seu serviço.

Sim

Nome de usuário do Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça o nome de usuário usado para se conectar à instância do Snowflake.

Sim

Warehouse Snowflake

Warehouse Snowflake usado para executar consultas.

Sim

Contexto dos parâmetros de ingestão do SQLServer

Parâmetro

Descrição

Nomes de tabela inclusos

Uma lista separada por vírgulas de caminhos de tabela de origem, incluindo seus bancos de dados e esquemas, por exemplo:

database_1.public.table_1, database_2.schema_2.table_2

Regex de tabela inclusa

Uma expressão regular para corresponder com os caminhos de tabela, incluindo nomes de bancos de dados e esquemas. Cada caminho correspondente à expressão é replicado, e as novas tabelas correspondentes ao padrão criadas no futuro também são incluídas automaticamente, por exemplo:

database_name\.public\.auto_.*

Filtrar JSON

Um JSON contendo uma lista de nomes de tabela totalmente qualificados e um padrão regex para nomes de coluna que devem ser incluídos na replicação.

O exemplo a seguir inclui todas as colunas que terminam com name na table1 do esquema público no banco de dados my_db:

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

CRON do cronograma de tarefas de fusão

A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como * * * * * ? se quiser ter uma fusão contínua ou um cronograma de tempo para limitar o tempo de execução do warehouse.

Por exemplo:

  • A cadeia de caracteres * 0 * * * ? indica que você deseja agendar fusões de hora em hora por um minuto

  • A cadeia de caracteres * 20 14 ? * MON-FRI indica que você deseja agendar fusões às 2:20 PM de segunda a sexta-feira.

Para obter mais informações e exemplos, consulte o tutorial de acionadores do cron na documentação do Quartz

Remova e adicione novamente uma tabela à replicação

Para remover uma tabela da replicação, remova-a dos parâmetros Nomes de tabela inclusos ou Regex de tabela inclusa no contexto de parâmetros de replicação.

Para adicionar novamente a tabela à replicação mais tarde, primeiro exclua a tabela de destino correspondente do Snowflake. Após isso, adicione a tabela novamente aos parâmetros Nomes de tabela inclusos ou Regex de tabela inclusa. Isso garante que o processo de replicação comece do zero para a tabela.

Essa abordagem também pode ser usada para se recuperar de um cenário de replicação de tabela com falha.

Replique um subconjunto de colunas em uma tabela

O conector filtra os dados replicados por tabela para um subconjunto de colunas configuradas.

Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.

Inclua ou exclua colunas por nome ou padrão. Você pode aplicar uma única condição por tabela ou combinar várias condições, com as exclusões sempre tendo precedência sobre as inclusões.

O exemplo a seguir mostra os campos disponíveis. Os campos schema e table são obrigatórios. É necessário um ou mais de included, excluded, includedPattern e excludedPattern.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Monitore as alterações de dados em tabelas

O conector replica o estado atual dos dados das tabelas de origem e o estado das linhas de cada conjunto de alterações. Esses dados são armazenados em tabelas de diário criadas no mesmo esquema da tabela de destino.

Os nomes das tabelas de diário são formatados como: é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Como resultado, as tabelas de origem que passam por alterações de esquema terão vários tabelas de diário.

Quando você remove uma tabela da replicação e depois a adiciona novamente, o <timestamp> value changes, and <schema generation> starts again from 1.

Importante

A Snowflake não recomenda que você altere a estrutura das tabelas de diário de forma alguma. O conector as utiliza para atualizar a tabela de destino como parte do processo de replicação.

O conector nunca descarta tabelas de diário, mas usa o diário mais recente para cada tabela de origem replicada, lendo apenas os fluxos somente para anexação acima dos diários. Para recuperar o armazenamento, você pode:

  • Truncar todas as tabelas de diário a qualquer momento.

  • Descartar as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação.

  • Descartar todas as tabelas de diário, exceto as de última geração, para tabelas replicadas ativamente.

Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders e você tiver removido anteriormente a tabela customers da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure o agendamento de tarefas de fusão

O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.

Use a expressão CRON no parâmetro CRON do cronograma da tarefa de mesclagem para limitar o custo do warehouse e as mesclagens somente ao horário agendado. Ele controla o fluxo dos MergeSnowflakeJournalTable que chegam ao processador e as mesclagens são acionadas somente em um período de tempo específico. Para obter mais informações sobre agendamento, consulte <https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy>Estratégia de agendamento _.

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.