Configure o Openflow Connector for SQL Server¶
Nota
O conector está sujeito aos Termos do conector Snowflake.
Este tópico descreve como configurar o Openflow Connector for SQL Server.
Para obter informações sobre o processo de carregamento incremental, consulte Replicação incremental.
Pré-requisitos¶
Antes de configurar o conector, você precisa concluir os seguintes pré-requisitos:
Certifique-se de ter revisado Sobre a Openflow Connector for SQL Server.
Certifique-se de ter revisado Versões compatíveis do servidor SQL.
Certifique-se de ter configurado a implantação do seu tempo de execução. Para obter mais informações, consulte os seguintes tópicos:
Se você usa Openflow - Snowflake Deployments, certifique-se de ter revisado a configuração dos domínios necessários e concedido acesso aos domínios necessários para o conector SQL Server.
Configurar a instância do servidor SQL¶
Antes de configurar o conector, execute as seguintes tarefas no ambiente do servidor SQL:
Nota
Você deve executá-las como administrador de banco de dados.
Habilite o rastreamento de alterações nos bancos de dados e nas tabelas que você planeja replicar, conforme mostrado no seguinte exemplo de servidor SQL:
ALTER DATABASE <database> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON); ALTER TABLE <schema>.<table> ENABLE CHANGE_TRACKING;
Nota
Execute esses comandos para cada banco de dados e tabela que você planeja replicar.
O conector exige que o rastreamento de alterações esteja habilitado nos bancos de dados e nas tabelas antes do início da replicação. Certifique-se de que cada tabela que você planeja replicar tenha habilitado o rastreamento de alterações. Você também pode habilitar o rastreamento de alterações em tabelas adicionais enquanto o conector está em execução.
Crie um login para a instância do servidor SQL:
CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
Esse login é usado para criar usuários para os bancos de dados que você planeja replicar.
Crie um usuário para cada banco de dados que você está replicando executando o seguinte comando do servidor SQL em cada banco de dados:
USE <source_database>; CREATE USER <user_name> FOR LOGIN <user_name>;
Conceda as permissões SELECT e VIEW CHANGE TRACKING ao usuário para cada banco de dados que você está replicando:
GRANT SELECT ON <database>.<schema>.<table> TO <user_name>; GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
Execute esses comandos em cada banco de dados para cada tabela que você planeja replicar. Essas permissões devem ser concedidas ao usuário de cada banco de dados que você criou em uma etapa anterior.
(Opcional) Conceda o privilégio VIEW DEFINITION nos tipos de dados definidos pelo usuário (User Defined Data Types, UDDT).
Se suas tabelas contiverem colunas que usam tipos de dados definidos pelo usuário (UDDT), e o UDDT for de propriedade de um usuário diferente daquele do conector, você deverá conceder a permissão VIEW DEFINITION ao usuário do conector, como mostrado no seguinte exemplo do SQL Server:
GRANT VIEW DEFINITION TO <user_name>;
Sem essa permissão, as colunas que usam UDDT são silenciosamente excluídas da replicação.
(Opcional) Configure a conexão SSL.
Se você usa uma conexão SSL para conectar o servidor SQL, crie o certificado raiz para o servidor de banco de dados. Isso é necessário ao configurar o conector.
Configure seu ambiente Snowflake¶
Como administrador do Snowflake, execute as seguintes tarefas:
Crie um banco de dados de destino no Snowflake para armazenar os dados replicados:
CREATE DATABASE <destination_database>;
Crie um usuário de serviço do Snowflake:
CREATE USER <openflow_user> TYPE = SERVICE COMMENT='Service user for automated access of Openflow';
Crie uma função do Snowflake para o conector e conceda os privilégios necessários:
CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
Use essa função para gerenciar o acesso do conector ao banco de dados Snowflake.
Para criar objetos no banco de dados de destino, você deve conceder os privilégios USAGE e CREATE SCHEMA no banco de dados para a função usada para gerenciar o acesso.
Crie um warehouse Snowflake para o conector e conceda os privilégios necessários:
CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
A Snowflake recomenda começar com um tamanho XSMALL do warehouse e, depois, fazer testes com o tamanho dependendo do número de tabelas que estão sendo replicadas e da quantidade de dados transferidos. Normalmente, um grande número de tabelas é mais bem dimensionado com warehouses multicluster, em vez de um warehouse de tamanho maior. Para obter mais informações, consulte warehouses multicluster.
Configure as chaves públicas e privadas para autenticação do par de chaves:
Crie um par de chaves seguras (pública e privada).
Armazene a chave privada do usuário em um arquivo para fornecer à configuração do conector.
Atribua a chave pública ao usuário do serviço Snowflake:
ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
Para obter mais informações, consulte Autenticação de pares de chaves e rotação de pares de chaves.
Instalação do conector¶
Para instalar o conector, faça o seguinte como engenheiro de dados:
Navegue até a página de visão geral do Openflow. Na seção Featured connectors, selecione View more connectors.
Na página de conectores do Openflow, localize o conector e selecione Add to runtime.
Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes e clique em Add.
Nota
Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.
Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.
Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Para configurar o conector, faça o seguinte como engenheiro de dados:
Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.
Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo.
Parâmetros de fluxo¶
Comece definindo os parâmetros do contexto de parâmetros de origem do SQLServer e, em seguida, do contexto de parâmetros de destino do SQLServer. Depois de concluir esse procedimento, habilite o conector. O conector se conecta a ambos SQLServer e Snowflake e começa a ser executado. No entanto, o conector não replica nenhum dado até que as tabelas a serem replicadas sejam explicitamente adicionadas à sua configuração.
Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do SQLServer. Depois que você aplicar as alterações ao contexto Parâmetros de ingestão do SQLServer, a configuração será captada pelo conector e o ciclo de vida da replicação será iniciado para cada tabela.
Contexto dos parâmetros de origem do SQLServer¶
Parâmetro |
Descrição |
|---|---|
SQLServer de conexão do URL |
O URL completo do JDBC para o banco de dados de origem. Exemplo:
|
Driver SQLServer JDBC |
Marque a caixa de seleção Reference asset para carregar o driver JDBC do servidor SQL. |
Nome de usuário SQLServer |
O nome de usuário do conector. |
Senha SQLServer |
A senha do conector. |
Contexto dos parâmetros de destino do SQLServer¶
Parâmetro |
Descrição |
Obrigatório |
|---|---|---|
Banco de dados de destino |
O banco de dados em que os dados persistem. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. |
Sim |
Estratégia de autenticação Snowflake |
Ao utilizar:
|
Sim |
Identificador de conta Snowflake |
Ao utilizar:
|
Sim |
Estratégia de conexão com o Snowflake |
Ao usar KEY_PAIR, especifique a estratégia para conexão com o Snowflake:
|
Necessário somente para BYOC com KEY_PAIR; caso contrário, será ignorado. |
Resolução de identificador do objeto Snowflake |
Especifica como os identificadores de objetos de origem, como esquemas, tabelas e nomes de colunas, são armazenados e consultados no Snowflake. Essa configuração determina se você precisará usar aspas duplas nas consultas SQL. Opção 1: padrão, sem distinção entre maiúsculas e minúsculas (recomendado).
Nota A Snowflake recomenda usar esta opção se não for esperado que os objetos de banco de dados tenham nomes que misturem letras maiúsculas e minúsculas. Importante Não altere esta configuração após o início da ingestão do conector. Se esta configuração for alterada após o início da ingestão, ela será interrompida. Se você precisar alterar essa configuração, crie uma nova instância do conector. Opção 2: com distinção entre maiúsculas e minúsculas.
Nota A Snowflake recomenda usar essa opção se for necessário preservar o uso de maiúsculas/minúsculas na origem por razões herdadas ou de compatibilidade. Por exemplo, se o banco de dados de origem inclui nomes de tabelas que são diferentes apenas no uso de maiúsculas e minúsculas, como |
Sim |
Chave privada Snowflake |
Ao utilizar:
|
Não |
Arquivo de chave privada Snowflake |
Ao utilizar:
|
Não |
Senha de chave privada Snowflake |
Ao utilizar:
|
Não |
Função Snowflake |
Ao utilizar:
|
Sim |
Nome de usuário do Snowflake |
Ao utilizar:
|
Sim |
Warehouse Snowflake |
Warehouse Snowflake usado para executar consultas. |
Sim |
Contexto dos parâmetros de ingestão do SQLServer¶
Parâmetro |
Descrição |
|---|---|
Nomes de tabela inclusos |
Uma lista separada por vírgulas de caminhos de tabela de origem, incluindo seus bancos de dados e esquemas, por exemplo:
|
Regex de tabela inclusa |
Uma expressão regular para corresponder com os caminhos de tabela, incluindo nomes de bancos de dados e esquemas. Cada caminho correspondente à expressão é replicado, e as novas tabelas correspondentes ao padrão criadas no futuro também são incluídas automaticamente, por exemplo:
|
Filtrar JSON |
Um JSON contendo uma lista de nomes de tabela totalmente qualificados e um padrão regex para nomes de coluna que devem ser incluídos na replicação. O exemplo a seguir inclui todas as colunas que terminam com
|
CRON do cronograma de tarefas de fusão |
A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como Por exemplo:
Para obter mais informações e exemplos, consulte o tutorial de acionadores do cron na documentação do Quartz |
Replicar tabelas de um servidor de réplica do SQL Server¶
O conector pode ingerir dados de um servidor primário, de um servidor de assinante usando uma replicação transacional ou de uma réplica secundária em um grupo de disponibilidade Always On. Antes de configurar o conector para se conectar a uma réplica do SQL Server, certifique-se de que a replicação entre os nós primários e de réplica funcione corretamente. Ao investigar problemas de dados ausentes no conector, primeiro certifique-se de que as linhas ausentes e os eventos de rastreamento de alterações estejam presentes no servidor de réplica usado pelo conector.
Para garantir a continuidade, garanta que o mesmo usuário de conexão esteja disponível nos servidores tanto primário quanto de réplica e tenha acesso aos dados e às tabelas de rastreamento de alterações.
Replicação transacional¶
Replicação transacional é um mecanismo de distribuição de dados que copia alterações de dados de um editor para os assinantes. Para configurar o conector para ler de um servidor de assinante em vez do editor, especifique o URL do servidor de assinante no parâmetro SQLServer Connection URL.
Aviso
Não altere o servidor de banco de dados após o início da replicação. Cada banco de dados mantém o próprio estado de rastreamento de alterações de forma independente, portanto, mudar para um servidor diferente faria com que o conector perdesse o controle de quais alterações já foram processadas, o que pode resultar em perda de dados.
Grupos de disponibilidade Always On¶
Grupos de disponibilidade Always On são uma solução de alta disponibilidade e recuperação de desastres que mantém cópias sincronizadas de bancos de dados para fins de failover. O conector pode ler de uma réplica secundária no grupo de disponibilidade. Para obter a melhor experiência, configure um ouvinte de grupo de disponibilidade e use o nome DNS do ouvinte no parâmetro SQLServer Connection URL.
Reiniciar a replicação da tabela¶
Uma tabela no estado FAILED, por exemplo, devido a uma chave primária ausente ou alteração de esquema incompatível, não é reiniciada automaticamente. Se uma tabela entrar no estado FAILED, ou se você precisar reiniciar a replicação do zero, use o procedimento a seguir para remover e adicionar novamente a tabela à replicação.
Nota
Se a falha foi causada por um problema na tabela de origem, como a falta de uma chave primária, resolva esse problema no banco de dados de origem antes de continuar.
Remova a tabela dos parâmetros de fluxo: no contexto de parâmetros de ingestão, remova a tabela de Included Table Names ou modifique o Included Table Regex para que a tabela não seja mais correspondida.
Verifique se a tabela foi removida:
Na tela do tempo de execução do Openflow, clique com o botão direito do mouse em um grupo de processadores e escolha Controller Services.
Na tabela com a lista de serviços do controlador, localize a linha Table State Store, clique nos três pontos verticais à direita da linha e escolha View State.
Importante
É necessário aguardar até que o estado da tabela seja totalmente removido da lista antes de prosseguir. Não continue até que esta alteração de configuração tenha sido concluída.
Limpe o destino: assim que o estado da tabela for exibido como totalmente removido, execute DROP manualmente da tabela de destino no Snowflake. Observe que o conector não substituirá uma tabela de destino existente durante a fase do instantâneo. Se a tabela ainda existir, a replicação falhará novamente. Opcionalmente, a tabela de diário e o fluxo também podem ser removidos se não forem mais necessários.
Adicione a tabela novamente: atualize os parâmetros Included Table Names ou Included Table Regex para incluir a tabela novamente.
Verifique a reinicialização: consulte Table State Store seguindo as instruções já apresentadas. O estado da tabela deve aparecer com status NEW, depois passar para SNAPSHOT_REPLICATION e, por fim, INCREMENTAL_REPLICATION.
Replique um subconjunto de colunas em uma tabela¶
O conector filtra os dados replicados por tabela para um subconjunto de colunas configuradas.
Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.
Inclua ou exclua colunas por nome ou padrão. Você pode aplicar uma única condição por tabela ou combinar várias condições, com as exclusões sempre tendo precedência sobre as inclusões.
O exemplo a seguir mostra os campos disponíveis. Os campos schema e table são obrigatórios. É necessário um ou mais de included, excluded, includedPattern e excludedPattern.
[
{
"schema": "<source table schema>",
"table" : "<source table name>",
"included": ["<column name>", "<column name>"],
"excluded": ["<column name>", "<column name>"],
"includedPattern": "<regular expression>",
"excludedPattern": "<regular expression>",
}
]
Monitore as alterações de dados em tabelas¶
O conector replica o estado atual dos dados das tabelas de origem e o estado das linhas de cada conjunto de alterações. Esses dados são armazenados em tabelas de diário criadas no mesmo esquema da tabela de destino.
Os nomes das tabelas de diário são formatados como: <source_table_name>_JOURNAL_<timestamp>_<schema_generation>, em que <timestamp> é o valor de segundos da época quando a tabela de origem foi adicionada à replicação e <schema_generation> é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Como resultado, as tabelas de origem que passam por alterações de esquema terão vários tabelas de diário.
Quando você remove uma tabela da replicação e depois a adiciona de volta, o valor <timestamp> muda e <schema_generation> começa novamente a partir de 1.
Importante
A Snowflake não recomenda que você altere a estrutura das tabelas de diário de forma alguma. O conector as utiliza para atualizar a tabela de destino como parte do processo de replicação.
O conector nunca descarta tabelas de diário, mas usa o diário mais recente para cada tabela de origem replicada, lendo apenas os fluxos somente para anexação acima dos diários. Para recuperar o armazenamento, você pode:
Truncar todas as tabelas de diário a qualquer momento.
Descartar as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação.
Descartar todas as tabelas de diário, exceto as de última geração, para tabelas replicadas ativamente.
Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders e você tiver removido anteriormente a tabela customers da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2.
customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2
Configure o agendamento de tarefas de fusão¶
O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.
Use a expressão CRON no parâmetro CRON do cronograma da tarefa de mesclagem para limitar o custo do warehouse e as mesclagens somente ao horário agendado. Ele controla o fluxo dos MergeSnowflakeJournalTable que chegam ao processador e as mesclagens são acionadas somente em um período de tempo específico. Para obter mais informações sobre agendamento, consulte <https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#scheduling-strategy>Estratégia de agendamento _.
Execute o fluxo¶
Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.
Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.