Bonnes pratiques pour Snowpipe Streaming avec architecture hautes performances

Ce guide présente les bonnes pratiques clés pour concevoir et mettre en œuvre des pipelines d’ingestion de données performants à l’aide de Snowpipe Streaming avec architecture hautes performances. En suivant ces bonnes pratiques, vous vous assurez que vos pipelines sont durables, fiables et dotés d’un traitement des erreurs efficace.

Gérer les canaux de manière stratégique

Appliquez les stratégies de gestion des canaux suivantes pour garantir les performances et la stabilité à long terme :

  • Utilisation de canaux de longue durée : Pour minimiser la charge, ouvrez un canal une seule fois, puis laissez-le actif pendant toute la durée de la tâche d’ingestion. Évitez d’ouvrir et de fermer des canaux de manière répétée.

  • Utilisation de noms de canaux déterministes : Appliquez une convention de dénomination cohérente et prévisible (par exemple, source-env-region-client-id) pour simplifier le dépannage et faciliter les processus de récupération automatisés.

  • Mise à l’échelle avec plusieurs canaux : Pour augmenter le débit, ouvrez plusieurs canaux. Ces canaux peuvent pointer vers un seul canal cible ou vers plusieurs canaux, en fonction des limites de service et de vos exigences de débit.

  • Surveillance de l’état des canaux : Utilisez régulièrement la méthode getChannelStatus pour surveiller la santé de vos canaux d’ingestion.

    • Suivez le last_committed_offset_token pour vérifier que les données sont ingérées correctement et que le pipeline progresse.

    • Surveillez le row_error_count pour détecter rapidement les enregistrements incorrects ou d’autres problèmes d’ingestion.

Valider le schéma de manière cohérente

Assurez-vous que les données entrantes sont conformes au schéma de table attendu afin d’éviter des échecs d’ingestion et de maintenir l’intégrité des données :

  • Validation côté client : Implémentez la validation des schémas côté client pour obtenir un retour immédiat et réduire les erreurs côté serveur. Bien que la validation complète ligne par ligne offre une sécurité maximale, une méthode plus performante peut impliquer une validation sélective, par exemple aux limites des lots ou par échantillonnage des lignes.

  • Validation côté serveur : L’architecture hautes performances peut décharger la validation des schémas sur le serveur. Les erreurs et leur nombre sont signalés par le biais de getChannelStatus si des discordances de schéma se produisent lors de l’ingestion dans le canal et la table cibles.

Maintenir l’état pour une récupération fiable

Pour éviter toute perte ou duplication de données, votre application doit maintenir son état pour gérer correctement les redémarrages et les échecs :

  • Persistance du jeton de décalage : Après chaque appel d’API réussi, conservez le last_committed_offset_token dans un stockage durable.

  • Reprise depuis le dernier point : Lors du redémarrage de l’application, récupérez le dernier jeton validé de Snowflake et reprenez l’ingestion à partir de ce point précis. Cela permet de garantir un traitement unique et d’assurer la continuité.

Ajouter des colonnes de métadonnées côté client

Pour permettre une détection des erreurs et une récupération efficaces, vous devez conserver les métadonnées d’ingestion dans la charge utile de lignes. Cela nécessite de planifier à l’avance la structure de vos données et la définition PIPE.

Ajoutez les colonnes suivantes à votre charge utile de lignes avant l’ingestion :

  • CHANNEL_ID (Par exemple, un INTEGER compact.)

  • STREAM_OFFSET (Un BIGINT qui augmente de manière monotone par canal, comme un décalage de partition Kafka).

Ensemble, ces colonnes identifient de manière unique les enregistrements par canal et vous permettent de tracer l’origine des données.

En option : Ajoutez une colonne PIPE_ID si plusieurs canaux intègrent la même table cible. En procédant ainsi, vous pouvez facilement retracer les lignes dans leur pipeline d’ingestion. Vous pouvez stocker des noms de canaux descriptifs dans une table de recherche distincte, en les mappant en entiers compacts afin de réduire les coûts de stockage.

Détecter les erreurs et y remédier à l’aide des décalages de métadonnées

Combinez la surveillance des canaux avec vos colonnes de métadonnées pour détecter les problèmes et y remédier :

  • Surveillance de l’état : Vérifiez régulièrement getChannelStatus. Un row_error_count croissant est un indicateur fort d’un problème potentiel.

  • Détection des enregistrements manquants : Si des erreurs sont détectées, utilisez une requête SQL pour identifier les enregistrements manquants ou obsolètes en vérifiant les écarts dans votre séquence STREAM_OFFSET.

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

Optimiser les performances et les coûts d’ingestion avec MATCH_BY_COLUMN_NAME

Configurez votre canal pour mapper les colonnes nécessaires à partir de vos données sources au lieu d’ingérer toutes les données dans une seule colonne VARIANT. Pour ce faire, utilisez MATCH_BY_COLUMN_NAME = CASE_SENSITIVE ou appliquez des transformations dans la définition de votre canal. Cette bonne pratique optimise non seulement vos coûts d’ingestion, mais elle améliore également les performances globales de votre pipeline de données de flux.

Cette bonne pratique présente les avantages importants suivants :

  • En utilisant MATCH_BY_COLUMN_NAME = CASE_SENSITIVE, vous n’êtes facturé que pour les valeurs de données qui sont ingérées dans votre table cible. En revanche, l’ingestion de données dans une seule colonne VARIANT vous facture toutes les octets JSON, y compris les clés et les valeurs. Pour les données avec des clés JSON verbeuses ou nombreuses, cela peut entraîner une augmentation importante et inutile de vos coûts d’ingestion.

  • Le moteur de traitement de Snowflake est plus efficace sur le plan des calculs. Au lieu d’analyser l’intégralité de l’objet JSON dans une VARIANT, puis d’extraire les colonnes requises, cette méthode extrait directement les valeurs nécessaires.