Configure o Openflow Connector for SQL Server

Nota

This connector is subject to the Snowflake Connector Terms.

This topic describes how to set up the Openflow Connector for SQL Server.

Para obter informações sobre o processo de carregamento incremental, consulte Replicação incremental.

Pré-requisitos

Antes de configurar o conector, você precisa concluir os seguintes pré-requisitos:

  1. Certifique-se de ter revisado Sobre a Openflow Connector for SQL Server.

  2. Certifique-se de ter revisado Versões compatíveis do servidor SQL.

  3. Certifique-se de ter configurado a implantação do seu tempo de execução. Para obter mais informações, consulte os seguintes tópicos:

  4. If you use Openflow - Snowflake Deployments, ensure that you have reviewed configuring required domains and have granted access to the required domains for the SQL Server connector.

Set up your SQL Server instance

Antes de configurar o conector, execute as seguintes tarefas no ambiente do servidor SQL:

Nota

Você deve executá-las como administrador de banco de dados.

  1. Enable change tracking on the databases and tables that you plan to replicate, as shown in the following SQL Server example:

    ALTER DATABASE <database>
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
    
    ALTER TABLE <schema>.<table>
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
    
    Copy

    Nota

    Execute esses comandos para cada banco de dados e tabela que você planeja replicar.

    O conector exige que o rastreamento de alterações esteja habilitado nos bancos de dados e nas tabelas antes do início da replicação. Certifique-se de que cada tabela que você planeja replicar tenha habilitado o rastreamento de alterações. Você também pode habilitar o rastreamento de alterações em tabelas adicionais enquanto o conector está em execução.

  2. Crie um login para a instância do servidor SQL:

    CREATE LOGIN <user_name> WITH PASSWORD = '<password>';
    
    Copy

    Esse login é usado para criar usuários para os bancos de dados que você planeja replicar.

  3. Crie um usuário para cada banco de dados que você está replicando executando o seguinte comando do servidor SQL em cada banco de dados:

    USE <source_database>;
    CREATE USER <user_name> FOR LOGIN <user_name>;
    
    Copy
  4. Conceda as permissões SELECT e VIEW CHANGE TRACKING ao usuário para cada banco de dados que você está replicando:

    GRANT SELECT ON <database>.<schema>.<table> TO <user_name>;
    GRANT VIEW CHANGE TRACKING ON <database>.<schema>.<table> TO <user_name>;
    
    Copy

    Execute esses comandos em cada banco de dados para cada tabela que você planeja replicar. Essas permissões devem ser concedidas ao usuário de cada banco de dados que você criou em uma etapa anterior.

  5. (Optional) Configure SSL connection.

    If you use an SSL connection to connect SQL Server, create the root certificate for your database server. This is required when configuring the connector.

Configure seu ambiente Snowflake

As a Snowflake administrator, perform the following tasks:

  1. Crie um banco de dados de destino no Snowflake para armazenar os dados replicados:

    CREATE DATABASE <destination_database>;
    
    Copy
  2. Crie um usuário de serviço do Snowflake:

    CREATE USER <openflow_user>
      TYPE = SERVICE
      COMMENT='Service user for automated access of Openflow';
    
    Copy
  3. Crie uma função do Snowflake para o conector e conceda os privilégios necessários:

    CREATE ROLE <openflow_role>;
    GRANT ROLE <openflow_role> TO USER <openflow_user>;
    GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>;
    GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>;
    
    Copy

    Use essa função para gerenciar o acesso do conector ao banco de dados Snowflake.

    Para criar objetos no banco de dados de destino, você deve conceder os privilégios USAGE e CREATE SCHEMA no banco de dados para a função usada para gerenciar o acesso.

  4. Crie um warehouse Snowflake para o conector e conceda os privilégios necessários:

    CREATE WAREHOUSE <openflow_warehouse> WITH
      WAREHOUSE_SIZE = 'MEDIUM'
      AUTO_SUSPEND = 300
      AUTO_RESUME = TRUE;
    GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>;
    
    Copy

    Snowflake recommends starting with a MEDIUM warehouse size, then experimenting with size depending on the number of tables being replicated and the amount of data transferred. Large numbers of tables typically scale better with multi-cluster warehouses, rather than a larger warehouse size. For more information, see multi-cluster warehouses.

  5. Set up the public and private keys for key pair authentication:

    1. Crie um par de chaves seguras (pública e privada).

    2. Store the private key for the user in a file to supply to the connector’s configuration.

    3. Atribua a chave pública ao usuário do serviço Snowflake:

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey';
      
      Copy

      Para obter mais informações, consulte Autenticação de pares de chaves e rotação de pares de chaves.

Configuração do conector

As a data engineer, install and configure the connector using the following sections.

Instalação do conector

  1. Navigate to the Openflow overview page. In the Featured connectors section, select View more connectors.

  2. Na página de conectores do Openflow, localize o conector e selecione Add to runtime.

  3. In the Select runtime dialog, select your runtime from the Available runtimes drop-down list and click Add.

    Nota

    Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos.

  4. Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído.

  5. Autentique-se no tempo de execução com as credenciais de sua conta Snowflake.

A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.

Configuração do conector

To configure the connector, perform the following steps:

  1. Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters.

  2. Populate the required parameter values as described in Parâmetros de fluxo.

Parâmetros de fluxo

Start by setting the parameters of the SQLServer Source Parameters context, then the SQLServer Destination Parameters context. After you complete this, enable the connector. The connector connects to both SQLServer and Snowflake and starts running. However, the connector does not replicate any data until any tables to be replicated are explicitly added to its configuration.

Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do SQLServer. Depois que você aplicar as alterações ao contexto Parâmetros de ingestão do SQLServer, a configuração será captada pelo conector e o ciclo de vida da replicação será iniciado para cada tabela.

Contexto dos parâmetros de origem do SQLServer

Parâmetro

Descrição

URL de conexão do SQL Server

O URL completo do JDBC para o banco de dados de origem.

Exemplo:

  • jdbc:sqlserver://example.com:1433;encrypt=false;

Driver JDBC do SQL Server

Select the Reference asset checkbox to upload the SQL Server JDBC driver.

Nome de usuário do servidor SQL

The user name for the connector.

Senha do servidor SQL

A senha do conector.

Contexto dos parâmetros de destino do SQLServer

Parâmetro

Descrição

Obrigatório

Banco de dados de destino

The database where data is persisted. It must already exist in Snowflake. The name is case-sensitive. For unquoted identifiers, provide the name in uppercase.

Sim

Estratégia de autenticação Snowflake

Ao utilizar:

  • Implantação do Snowflake OpenFlow ou BYOC: Use SNOWFLAKE_SESSION_TOKEN. O Snowflake gerencia este token automaticamente. As implantações BYOC já devem ter configurado as funções de tempo de execução para usar o SNOWFLAKE_SESSION_TOKEN.

  • BYOC: Alternatively BYOC can use KEY_PAIR as the value for authentication strategy.

Sim

Identificador de conta Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: Snowflake account name formatted as [organization-name]-[account-name] where data is persisted.

Sim

Snowflake Connection Strategy

Ao usar KEY_PAIR, especifique a estratégia para conexão com o Snowflake:

  • STANDARD (padrão): conecte-se aos serviços Snowflake usando o roteamento público padrão.

  • PRIVATE_CONNECTIVITY: conecte-se usando endereços privados associados à plataforma de nuvem de suporte, como AWS PrivateLink.

Necessário somente para BYOC com KEY_PAIR; caso contrário, será ignorado.

Snowflake Object Identifier Resolution

Especifica como os identificadores de objetos de origem, como esquemas, tabelas e nomes de colunas, são armazenados e consultados no Snowflake. Essa configuração determina se você precisará usar aspas duplas nas consultas SQL.

Opção 1: padrão, sem distinção entre maiúsculas e minúsculas (recomendado).

  • Transformação: Todos os identificadores são convertidos em maiúsculas. Por exemplo, My_Table torna-se MY_TABLE.

  • Consultas: as consultas SQL não diferenciam maiúsculas de minúsculas e não exigem as aspas duplas doSQL.

    Por exemplo SELECT * FROM my_table; retorna os mesmos resultados que SELECT * FROM MY_TABLE;.

Nota

A Snowflake recomenda usar esta opção se não for esperado que os objetos de banco de dados tenham nomes que misturem letras maiúsculas e minúsculas.

Importante

Não altere esta configuração após o início da ingestão do conector. Se esta configuração for alterada após o início da ingestão, ela será interrompida. Se você precisar alterar essa configuração, crie uma nova instância do conector.

Opção 2: com distinção entre maiúsculas e minúsculas.

  • Transformação: As letras maiúsculas/minúsculas são preservadas. Por exemplo, My_Table continua sendo My_Table.

  • Consultas: consultas SQL devem usar aspas duplas para corresponder exatamente ao uso de maiúsculas/minúsculas dos objetos de banco de dados. Por exemplo, SELECT * FROM "My_Table";.

Nota

A Snowflake recomenda usar essa opção se for necessário preservar o uso de maiúsculas/minúsculas na origem por razões herdadas ou de compatibilidade. Por exemplo, se o banco de dados de origem inclui nomes de tabelas que são diferentes apenas no uso de maiúsculas e minúsculas, como MY_TABLE e my_table, isso vai gerar um conflito de nomes ao usar comparações que não diferenciam maiúsculas de minúsculas.

Sim

Chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: deve ser a chave privada RSA utilizada para a autenticação.

    The RSA key must be formatted according to PKCS8 standards and have standard PEM headers and footers. Note that either a Snowflake Private Key File or a Snowflake Private Key must be defined.

Não

Arquivo de chave privada Snowflake

Ao utilizar:

  • Session token authentication strategy: The private key file must be blank.

  • KEY_PAIR: carregue o arquivo que contém a chave privada RSA usada para autenticação no Snowflake, formatado de acordo com os padrões PKCS8 e incluindo cabeçalhos e rodapés PEM padrão. A linha do cabeçalho começa com -----BEGIN PRIVATE. Para carregar o arquivo de chave privada, marque a caixa de seleção Reference asset.

Não

Senha de chave privada Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça a senha associada ao arquivo de chave privada do Snowflake.

Não

Função Snowflake

Ao utilizar:

  • Estratégia de autenticação de tokens de sessão: use sua função de tempo de execução. Você pode encontrar sua função de tempo de execução na UI do Openflow, navegando para View Details no seu tempo de execução.

  • Estratégia de autenticação de KEY_PAIR: use uma função válida configurada para o usuário do seu serviço.

Sim

Nome de usuário do Snowflake

Ao utilizar:

  • Session Token Authentication Strategy: deve ficar em branco.

  • KEY_PAIR: forneça o nome de usuário usado para se conectar à instância do Snowflake.

Sim

Warehouse Snowflake

Warehouse Snowflake usado para executar consultas.

Sim

Contexto dos parâmetros de ingestão do SQLServer

Parâmetro

Descrição

Nomes de tabela inclusos

A comma-separated list of source table paths, including their databases and schemas, for example:

database_1.public.table_1, database_2.schema_2.table_2

Regex de tabela inclusa

A regular expression to match against table paths, including database and schema names. Every path matching the expression is replicated, and new tables matching the pattern that are created later are also included automatically, for example:

database_name\.public\.auto_.*

Filtrar JSON

A JSON containing a list of fully-qualified table names and a regex pattern for column names that should be included into replication.

O exemplo a seguir inclui todas as colunas que terminam com name na table1 do esquema público no banco de dados my_db:

[ {"database":"my_db", "schema":"public", "table":"table1", "includedPattern":".*name"} ]

CRON do cronograma de tarefas de fusão

A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como * * * * * ? se quiser ter uma fusão contínua ou um cronograma de tempo para limitar o tempo de execução do warehouse.

Por exemplo:

  • A cadeia de caracteres * 0 * * * ? indica que você deseja agendar fusões de hora em hora por um minuto

  • A cadeia de caracteres * 20 14 ? * MON-FRI indica que você deseja agendar fusões às 2:20 PM de segunda a sexta-feira.

Para obter mais informações e exemplos, consulte o tutorial de acionadores do cron na documentação do Quartz

Remova e adicione novamente uma tabela à replicação

To remove a table from replication, remove it from the Included Table Names or Included Table Regex parameters in the Replication Parameters context.

To re-add the table to replication later, first delete the corresponding destination table in Snowflake. Afterward, add the table back to the Included Table Names or Included Table Regex parameters. This ensures that the replication process starts fresh for the table.

Essa abordagem também pode ser usada para se recuperar de um cenário de replicação de tabela com falha.

Replique um subconjunto de colunas em uma tabela

The connector filters the data replicated per table to a subset of configured columns.

Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.

Include or exclude columns by name or pattern. You can apply a single condition per table, or combine multiple conditions, with exclusions always taking precedence over inclusions.

O exemplo a seguir mostra os campos disponíveis. Os campos schema e table são obrigatórios. É necessário um ou mais de included, excluded, includedPattern e excludedPattern.

[
    {
        "schema": "<source table schema>",
        "table" : "<source table name>",
        "included": ["<column name>", "<column name>"],
        "excluded": ["<column name>", "<column name>"],
        "includedPattern": "<regular expression>",
        "excludedPattern": "<regular expression>",
    }
]
Copy

Monitore as alterações de dados em tabelas

The connector replicates the current state of data from the source tables, as well as every state of every row from every changeset. This data is stored in journal tables created in the same schema as the destination table.

Os nomes das tabelas de diário são formatados como: é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Como resultado, as tabelas de origem que passam por alterações de esquema terão vários tabelas de diário.

When you remove a table from replication, then add it back, the <timestamp> value changes, and <schema generation> starts again from 1.

Importante

Snowflake recommends not altering the structure of journal tables in any way. The connector uses them to update the destination table as part of the replication process.

The connector never drops journal tables, but uses the latest journal for every replicated source table, only reading append-only streams on top of journals. To reclaim the storage, you can:

  • Truncar todas as tabelas de diário a qualquer momento.

  • Descartar as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação.

  • Descartar todas as tabelas de diário, exceto as de última geração, para tabelas replicadas ativamente.

Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders e você tiver removido anteriormente a tabela customers da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2.

customers_1234_1
customers_1234_2
orders_5678_1
orders_5678_2

Configure o agendamento de tarefas de fusão

O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.

Use the CRON expression in the Merge task Schedule CRON parameter to limit the warehouse cost and limit merges to only scheduled time. It throttles the flow files coming to the MergeSnowflakeJournalTable processor and merges are triggered only in a dedicated period of time. For more information about scheduling, see Scheduling strategy.

Execute o fluxo

  1. Clique com o botão direito do mouse no plano e selecione Enable all Controller Services.

  2. Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.