Resiliência multilocal para pipelines de dados

A resiliência multilocal para pipelines de dados ajuda a proteger seus pipelines de dados contra possíveis interrupções de provedores de nuvem em toda a região. Esse recurso garante que, ao migrar para um local secundário, seus pipelines de dados (especificamente aqueles que usam o Snowpipe e COPY INTO) retomem o carregamento de novos dados sem interrupção ou ingestão duplicada.

Esse recurso funciona entre nuvens, permitindo que seus locais de armazenamento primário e de backup abranjam provedores de nuvem totalmente diferentes (por exemplo, ao migrar da AWS para o Azure), bem como regiões diferentes dentro da mesma nuvem.

Esse recurso depende de um modelo de responsabilidade compartilhada:

  • Função do Snowflake: o Snowflake replica nativamente suas tabelas de destino e o histórico de carregamento (estado de ingestão) para sua conta secundária. Durante um failover, o Snowflake usa esse estado para evitar duplicatas e ingerir apenas os arquivos que não foram processados ​​no local primário.

  • Sua função: no caso de interrupção (ou como parte de uma configuração de nuvem de gravação dupla), você deve rotear os novos arquivos recebidos para seu local de armazenamento em nuvem secundário. O Snowflake não replica seus arquivos de armazenamento em nuvem externo.

A resiliência do pipeline é garantida pela configuração de até dois recursos principais:

  • Integração de armazenamento multilocal (multi-location storage integration, MLSI): conecta o Snowflake com segurança a vários locais de armazenamento em nuvem externos em diferentes regiões ou nuvens. A MLSI é necessária quando você deseja resiliência para COPY INTO apenas de áreas de preparação externas ou para seu pipeline do Snowpipe.

  • Integração de notificação de várias filas (multi-queue notification integration, MQNI): conecta o Snowflake a várias filas de mensagens de nuvem de terceiros, garantindo o recebimento contínuo de notificações de novos arquivos. A MQNI só é necessária se você deseja resiliência para seu pipeline do Snowpipe, ou seja, para carregamento contínuo de dados.

Arquitetura de resiliência multilocal para pipelines de dados

Requisitos e considerações

Antes de configurar esse recurso, revise os seguintes pré-requisitos e considerações:

Requisitos

  • Edição: Snowflake Business Critical Edition (ou superior).

  • Métodos de ingestão compatíveis: esse recurso é exclusivamente compatível com o carregamento de dados baseado em arquivos por meio do Snowpipe (ingestão automática) e COPY INTO <table>. Ele não é compatível com o Openflow ou o Snowpipe Streaming.

  • Estruturas de caminho idênticas:** para permitir que seus pipelines localizem novos arquivos após o failover, você deve gravá-los no local de armazenamento secundário usando exatamente a mesma hierarquia, estrutura de pastas e caminho relativo do seu local primário.

Considerações

  • Faturamento:** esse recurso incorre em cobranças de replicação padrão (transferência de dados e recursos de computação), faturadas em sua conta de destino.

  • Tempo de inatividade para modificação de área de preparação: alterar a propriedade RELATIVE_URL em uma área de preparação existente invalidará os objetos dependentes e interromperá a ingestão. Recomendamos a criação de novas áreas de preparação durante a configuração para evitar tempo de inatividade.

  • Integração de notificação de várias filas (MQNI):** o uso da mesma fila ativa nas contas de origem e destino não é compatível. Fazer isso pode resultar em perda de notificações. O Snowflake não verifica se a mesma fila está em uso em todas as contas.

  • Tabela de diretórios: a criação de uma tabela de diretórios em uma área de preparação usando a MLSI não é compatível atualmente.

Comportamento de replicação

  • Replicação assíncrona: o Snowflake replica suas tabelas e o histórico de carga do seu pipeline simultaneamente no mesmo instantâneo. Como eles estão sincronizados, uma interrupção não resultará em dados duplicados. Se o seu banco de dados secundário estiver com quatro horas de atraso, os dados da tabela também estarão com quatro horas de atraso, e o processamento de quatro horas de notificações enfileiradas simplesmente atualiza a tabela.

  • Evitar perda de dados em gravação dupla: seu objetivo de ponto de recuperação (Recovery Point Objective, RPO) é ditado pelo seu intervalo de atualização de replicação. Para evitar perda de dados durante um failover, o período de retenção de mensagens da sua fila de mensagens na nuvem secundária deve ser maior que o seu intervalo de replicação. Se a sua fila descartar mensagens antes que a replicação agendada seja concluída, esses arquivos não serão ingeridos durante o failover.

  • Risco de perda de dados em gravação única: se você usar roteamento de gravação única, todos os arquivos processados ​​no local primário após a última replicação bem-sucedida serão totalmente desconhecidos para o local secundário. Em caso de failover, esses dados ficarão temporariamente ausentes na sua conta de destino.

Aviso

Aviso crítico para failback com gravação única: quando você executa uma atualização para retornar à sua conta primária original, o banco de dados primário é substituído pelo secundário. Se você não reconciliar e carregar manualmente esses arquivos órfãos no banco de dados secundário antes de sincronizar novamente, eles serão apagados permanentemente do banco de dados primário.

Escolhendo a arquitetura correta

Como o Snowflake replica assincronamente suas tabelas de destino e o histórico de carregamento do seu pipeline no mesmo instantâneo, seus pipelines ficam protegidos contra duplicação de dados e carregamentos parciais. Se ocorrer uma interrupção durante a ingestão, a transação é revertida completamente, de modo que não haja arquivos parcialmente carregados.

No entanto, a forma de recuperar arquivos «em trânsito» durante uma interrupção depende inteiramente de se o roteamento do seu armazenamento em nuvem externo está configurado para roteamento de gravação dupla ou única.

2. Roteamento de gravação única

Seu produtor grava apenas no armazenamento em nuvem primário. Em caso de interrupção, você redireciona o produtor para começar a gravar novos arquivos no armazenamento em nuvem secundário.

  • O que acontece no failover: a conta secundária começa imediatamente a processar novos arquivos roteados para o bucket secundário. No entanto, todos os arquivos em trânsito que estejam retidos no local primário afetado são deixados para trás temporariamente.

  • O que acontece no failback: quando o local primário se recupera e você retorna à sua conta Snowflake primária, o Snowpipe processa automaticamente todas as notificações de arquivos que chegaram à fila com sucesso antes da interrupção.

  • Resultado: sem duplicatas. No entanto, os arquivos em que a notificação na nuvem falhou completamente em ser gerada devido à interrupção (ou em que a interrupção durou mais do que a política de retenção de mensagens da sua fila) exigem intervenção manual.

  • Ação necessária: após o failback, compare seu bucket de armazenamento primário com a exibição COPY_HISTORY no Snowflake para identificar quaisquer arquivos ausentes. Execute ALTER PIPE … REFRESH ou um comando manual COPY INTO para carregar esses arquivos específicos que ficaram retidos.

Parte 1: Configuração única

As etapas a seguir são executadas uma única vez para configurar seus pipelines de dados resilientes. Como você configura os locais ativos para ambas as contas durante a configuração, o failover durante uma interrupção real é quase instantâneo.

Etapa 1: Criar uma integração de armazenamento multilocal (MLSI)

Para configurar uma integração de armazenamento multilocal, você segue as etapas padrão para configurar uma integração de armazenamento com algumas diferenças observadas nesta seção.

Em sua conta de origem, crie a MLSI fornecendo valores para cada local na lista STORAGE_LOCATIONS. Você pode combinar provedores de nuvem para configurações entre nuvens.

CREATE STORAGE INTEGRATION my_mlsi
  TYPE = EXTERNAL_STAGE
  STORAGE_LOCATIONS =
  (
    (
      NAME = 'my-s3-us-west-1'
      STORAGE_PROVIDER = 'S3'
      STORAGE_BASE_URL = 's3://myBucketWest'
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::12345:role/myrole'
      STORAGE_AWS_EXTERNAL_ID = 'mlsi-external-id'
      ENCRYPTION = ( TYPE = 'AWS_SSE_S3' )
    ),
    (
      NAME = 'my-s3-us-east-1'
      STORAGE_PROVIDER = 'S3'
      STORAGE_BASE_URL = 's3://myBucketEast'
      STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::67890:role/myrole'
      STORAGE_AWS_EXTERNAL_ID = 'mlsi-external-id'
      ENCRYPTION = ( TYPE = 'AWS_SSE_S3' )
    )
  )
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('*')
  ACTIVE = 'my-s3-us-west-1';

Onde:

Etapa 2: Associar a MLSI a uma área de preparação externa

Recomendamos a criação de uma nova área de preparação em vez de alterar uma existente.

Aviso

WARNING: alterar RELATIVE_URL causa tempo de inatividade

Se você usar ALTER STAGE para alterar o RELATIVE_URL de uma área de preparação existente, todas as tabelas de diretório dependentes serão recriadas e quaisquer canais ou tabelas externas que usem essa área de preparação serão marcados como inválidos e interromperão a ingestão. Prepare-se para passar por tempo de inatividade se você optar por alterar uma área de preparação existente.

Use o comando CREATE STAGE para associar a integração de armazenamento multilocal que você criou a uma ou mais áreas de preparação externas:

CREATE STAGE my_ext_stage
  RELATIVE_URL = '/my_folder/my_sub_folder/'
  STORAGE_INTEGRATION = 'my_mlsi';

Onde:

  • RELATIVE_URL: o caminho relativo para o local da sua área de preparação externa a partir do local de armazenamento definido em sua integração de armazenamento. Para permitir que seus pipelines localizem novos arquivos após um failover, você deve gravá-los no local de armazenamento secundário usando a mesma hierarquia, estrutura de pastas e caminho relativo do seu local primário.

Nota

Esse valor deve ser um caminho literal. A especificação de um padrão ou curinga não é possível. Para especificar o acesso a todos os locais sob o STORAGE_BASE_URL da sua integração de armazenamento, use uma cadeia de caracteres vazia RELATIVE_URL = “”.

  • STORAGE_INTEGRATION: o nome da sua integração de armazenamento multilocal.

Nota

Como alternativa, você pode alterar uma área de preparação externa existente especificando o parâmetro RELATIVE_URL e a sua MLSI. O comando ALTER STAGE também permite reverter essa alteração para que a área de preparação externa não use uma integração de armazenamento multilocal.

Por exemplo:

ALTER STAGE my_ext_stage SET
  RELATIVE_URL = '/my_folder/my_sub_folder/'
  STORAGE_INTEGRATION = 'my_mlsi';

Etapa 3: Configurar uma integração de notificação de várias filas (MQNI)

Se você usa carregamento de dados automatizado por meio de mensagens na nuvem e configurou uma integração de armazenamento multilocal para a sua área de preparação externa, também deve usar uma integração de notificação de várias filas para failover contínuo dos seus pipelines do Snowpipe.

Para cada fila que você definir para a integração de notificações, você deve preparar seu serviço de mensagens seguindo as etapas descritas nos tópicos a seguir:

Nota

Se você não quiser usar o Amazon SNS com o Snowpipe, poderá evitar a criação de uma MQNI, mas deverá executar uma etapa adicional durante o failover. Se você escolher essa opção, associe seu canal à área de preparação e à MLSI criada acima e, em seguida, prossiga para a Etapa 4.

Cenário A: Criar uma nova integração de notificação de várias filas (MQNI)

Para criar uma integração de notificação de várias filas, siga as etapas padrão para criar uma integração de notificação, com algumas diferenças observadas nesta seção.

Na sua conta de origem, crie uma integração de notificação de várias filas, fornecendo valores para cada fila na lista QUEUES:

CREATE NOTIFICATION INTEGRATION my_mqni
  ENABLED = TRUE
  TYPE = MULTI_QUEUE
  DIRECTION = INBOUND
  QUEUES = (
    (
      NAME = 'my-us-west-1'
      NOTIFICATION_PROVIDER = AWS_SNS
      AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-west-1:12345:my-snowpipe-mlsi-west'
    ),
    (
      NAME = 'my-us-east-1'
      NOTIFICATION_PROVIDER = AWS_SNS
      AWS_SNS_TOPIC_ARN = 'arn:aws:sns:us-west-1:12345:my-snowpipe-mlsi-east'
    )
  )
  ACTIVE = 'my-us-west-1';

Onde:

  • TYPE = MULTI_QUEUE: especifica que esta é uma integração de múltiplas filas entre o Snowflake e um serviço de filas de mensagens na nuvem de terceiros.

  • DIRECTION = INBOUND: especifica que o Snowflake recebe notificações enviadas pelo serviço de mensagens na nuvem.

  • QUEUES: especifica uma lista de uma ou mais filas para a integração de notificações.

  • NAME: cadeia de caracteres que especifica o identificador (nome) da fila.

Para visualizar os parâmetros específicos da fila para cada provedor de nuvem, consulte:

Depois de criar uma MQNI, você pode usá-la para criar um novo canal com o comando CREATE PIPE. O exemplo a seguir cria um canal para carregar dados do Amazon S3 em uma tabela usando uma área de preparação externa (my_ext_stage) que depende de uma Integração de armazenamento multilocal:

CREATE PIPE my_pipe
  AUTO_INGEST = TRUE
  INTEGRATION = my_mqni
  AS COPY INTO my_table FROM @my_ext_stage/my_pipe/;

Cenário B: Migrar uma integração de notificação existente para a MQNI

Se você já tem integrações de notificação existentes que deseja converter para a MQNI em vez de criar uma nova do zero, use a função SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE.

A função cria uma nova integração de notificação de várias filas usando o nome especificado, define a fila ativa para sua conta de origem como a fila original e migra automaticamente todos os canais na conta de origem para usar a nova MQNI.

Sintaxe:

SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE(
  '<new_mqni_name>',
  '<original_sns_topic_arn_or_int_name>',
  '<new_sns_topic_arn_or_int_name>'
)

Onde:

  • new_mqni_name: cadeia de caracteres que especifica um identificador (nome) a ser atribuído à nova integração de notificação de várias filas que a função cria.

  • original_sns_topic_arn_or_int_name:

    • Para a AWS, o nome de recurso da amazon (amazon resource name, ARN) do tópico SNS original associado a um ou mais canais.

    • Para o Google Cloud ou o Azure, uma cadeia de caracteres que especifica o identificador da sua integração de notificação de fila única original associada a um ou mais canais.

  • new_sns_topic_arn_or_int_name:

    • Para a AWS, o nome de recurso da amazon (amazon resource name, ARN) de um novo tópico SNS a ser adicionado como uma fila à MQNI.

    • Para o Google Cloud ou o Azure, uma cadeia de caracteres que especifica o identificador da sua nova integração de notificação de fila única para combinar com a integração de notificação original.

Exemplo 1: Adicionar uma nova fila de tópico SNS

SELECT SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE(
  'my_mqni',
  'arn:aws:sns:us-west-1:12345:my-snowpipe-mlsi-west',
  'arn:aws:sns:us-east-1:67890:my-snowpipe-mlsi-east'
);

Isso resulta em uma MQNI chamada my_mqni com as seguintes filas:

  • MY_MQNI-queue1 (para o tópico SNS original e ativo)

  • MY_MQNI-queue2 (para o novo tópico SNS)

Exemplo 2: Combinar duas integrações de notificação em MQNI

SELECT SYSTEM$CONVERT_PIPES_TO_MULTI_QUEUE(
  'my_azure_mqni',
  'my_azure_ni_1',
  'my_azure_ni_2'
);

Isso resulta em uma MQNI chamada my_azure_mqni com as seguintes filas:

  • my_azure_ni_1 (para a fila original e ativa)

  • my_azure_ni_2 (para a nova fila)

Nota

Se você quiser alterar a fila ativa em sua conta de origem, poderá usar uma instrução ALTER INTEGRATION … SET ACTIVE = “<my_queue>”. Você deve pausar todos os canais que usam a integração de notificação antes de atualizar a fila ativa.

Etapa 4: Replicar suas MLSI e MQNI para a conta de destino.

Nota

Uma operação de atualização descarta quaisquer integrações de armazenamento ou notificação na conta de destino que não sejam réplicas, a menos que os objetos tenham IDs globais.

Para obter mais informações, consulte Replicação e objetos em contas de destino.

1. To replicate your multi-location storage integration and multi-queue notification integration, alter your existing replication or failover group to include STORAGE INTEGRATIONS and NOTIFICATION INTEGRATIONS in the ALLOWED_INTEGRATION_TYPES list.

Por exemplo, use o comando ALTER FAILOVER GROUP:

ALTER FAILOVER GROUP my_fg SET
  OBJECT_TYPES = DATABASES, ROLES, INTEGRATIONS
  ALLOWED_INTEGRATION_TYPES = API INTEGRATIONS, STORAGE INTEGRATIONS,
    NOTIFICATION INTEGRATIONS;
  1. Em seguida, em sua conta de destino, execute uma operação de atualização:

ALTER FAILOVER GROUP my_fg REFRESH;

Etapa 5: Configurar os estados ativos na conta de destino

Após executar uma operação de atualização, para garantir um failover sem problemas durante uma interrupção real, configure o local de armazenamento ativo e a fila (se estiver usando uma integração de notificação) em sua conta de destino.

Em sua conta de destino:

  1. Para o local de armazenamento que você deseja definir como o local ativo na sua conta de destino, use as instruções nos tópicos a seguir para conceder ao Snowflake acesso ao seu armazenamento:

  2. Ative o armazenamento secundário: defina a MLSI para usar seu local de armazenamento de backup secundário na conta de destino.

    ALTER STORAGE INTEGRATION my_mlsi SET ACTIVE = 'my-s3-us-east-1';
    
  3. Se você estiver usando uma Integração de notificação de várias filas, conceda permissão ao Snowflake para acessar seu serviço de mensagens para a fila que você deseja definir como ativa na sua conta de destino. Siga as instruções para seu provedor de nuvem:

  4. Ative a fila secundária (se estiver usando MQNI): defina a fila ativa para seu local secundário na conta de destino.

    ALTER INTEGRATION my_mqni
      SET ACTIVE = 'MY_MQNI-queue2';
    

Parte 2: Etapas de failover

Execute estas etapas durante uma interrupção para redirecionar a ingestão de dados para seu local secundário. Como suas filas e armazenamento ativos foram pré-definidos na configuração, esse processo requer comandos mínimos.

  1. Promova a conta de destino: faça login em sua conta de destino e promova-a para que sirva como a nova conta primária. O carregamento de dados é retomado automaticamente a partir da sua infraestrutura de nuvem secundária.

    ALTER FAILOVER GROUP my_fg PRIMARY;
    
  2. Se não estiver usando o Amazon SNS com o Snowpipe: Se você não estiver usando o SNS com o Snowpipe e estiver contando apenas com o SQS, não será preciso criar uma MQNI. Em vez disso, chame a seguinte função do sistema para reconectar seu canal durante o failover.

    SELECT SYSTEM$INGEST_REBIND_PIPE('my_db.my_schema.my_pipe');
    

Parte 3: Etapas de failback

Assim que a interrupção for resolvida e sua localização primária estiver íntegra, execute estas etapas para mover seus pipelines de volta para a localização primária.

  1. Sincronize dados de volta: antes de promover sua conta original, você deve recuperar todos os dados e alterações de estado que ocorreram durante a interrupção para sua conta original. Faça login em sua conta primária original (atualmente atuando como a conta secundária) e inicie uma atualização manual:

    ALTER FAILOVER GROUP my_fg REFRESH;
    

    Importante

    Aguarde a conclusão completa dessa operação de atualização antes de prosseguir para a próxima etapa. A execução do failover antes da conclusão da sincronização pode resultar em perda de dados.

    Aviso

    Aviso crítico para failback de gravação única: se você usar o roteamento de gravação única, todos os arquivos processados ​​no local primário após a última replicação bem-sucedida serão desconhecidos para o local secundário. Após o failover, esses dados ficarão temporariamente ausentes na sua conta de destino. Quando você executa uma atualização para retornar à sua conta primária original, o banco de dados primário é substituído pelo secundário. Se você não reconciliar e carregar manualmente esses arquivos órfãos no banco de dados secundário antes de sincronizar novamente, eles serão apagados permanentemente do banco de dados primário.

  2. Promova a conta original: após a conclusão da atualização, promova sua conta de origem original de volta para o primário.

    ALTER FAILOVER GROUP my_fg PRIMARY;
    
  3. Se não estiver usando o Amazon SNS com o Snowpipe: Chame a função do sistema para vincular novamente seu canal ao local de origem original.

    SELECT SYSTEM$INGEST_REBIND_PIPE('my_db.my_schema.my_pipe');
    

Parte 4: Monitoramento e validação

Após iniciar um failover ou failback, use os seguintes comandos para verificar se os seus pipelines de dados foram redirecionados com sucesso e retomaram a ingestão.

1. Verificar os estados de integração ativos

Confirme se as suas integrações estão apontando para o armazenamento e as filas corretas, verificando as propriedades delas. Procure a propriedade ACTIVE na saída:

-- Check the active storage location
DESCRIBE STORAGE INTEGRATION my_mlsi;

-- Check the active message queue
DESCRIBE INTEGRATION my_mqni;

2. Verificar o status do canal (somente Snowpipe)

Use a função SYSTEM$PIPE_STATUS para garantir que seu canal esteja em execução e para verificar se ele está enfileirando ativamente novos arquivos do seu local secundário.

SELECT SYSTEM$PIPE_STATUS('my_pipe');

Procure por «executionState»:»RUNNING» e verifique «pendingFileCount» para confirmar se ele está reconhecendo ativamente novos arquivos adicionados ao seu bucket secundário.

3. Validar a ingestão bem-sucedida (histórico de carregamento)

Para garantir que os dados estejam sendo carregados sem erros ou duplicados, consulte a exibição COPY_HISTORY. Ela mostra exatamente quais arquivos foram ingeridos, o caminho de origem deles e quando foram carregados.

SELECT file_name, status, row_count, last_load_time
FROM TABLE(information_schema.copy_history(
  table_name => 'my_table',
  start_time => DATEADD(hours, -1, CURRENT_TIMESTAMP())
));

Verifique se os caminhos de file_name refletem seu local de armazenamento ativo e se o status é exibido como LOADED.