Bonnes pratiques pour Snowpipe Streaming avec architecture hautes performances¶
Ce guide présente les bonnes pratiques clés pour concevoir et mettre en œuvre des pipelines d’ingestion de données performants à l’aide de Snowpipe Streaming avec architecture hautes performances. En suivant ces bonnes pratiques, vous vous assurez que vos pipelines sont durables, fiables et dotés d’un traitement des erreurs efficace.
Gérer les canaux de manière stratégique¶
Appliquez les stratégies de gestion des canaux suivantes pour garantir les performances et la stabilité à long terme :
Utilisation de canaux de longue durée : Pour minimiser la charge, ouvrez un canal une seule fois, puis laissez-le actif pendant toute la durée de la tâche d’ingestion. Évitez d’ouvrir et de fermer des canaux de manière répétée.
Utilisation de noms de canaux déterministes : Appliquez une convention de dénomination cohérente et prévisible (par exemple,
source-env-region-client-id) pour simplifier le dépannage et faciliter les processus de récupération automatisés.Mise à l’échelle avec plusieurs canaux : Pour augmenter le débit, ouvrez plusieurs canaux. Ces canaux peuvent pointer vers un seul canal cible ou vers plusieurs canaux, en fonction des limites de service et de vos exigences de débit.
Surveillance de l’état des canaux : Utilisez régulièrement la méthode
getChannelStatuspour surveiller la santé de vos canaux d’ingestion.Suivez le
last_committed_offset_tokenpour vérifier que les données sont ingérées correctement et que le pipeline progresse.Surveillez le
row_error_countpour détecter rapidement les enregistrements incorrects ou d’autres problèmes d’ingestion.
Valider le schéma de manière cohérente¶
Assurez-vous que les données entrantes sont conformes au schéma de table attendu afin d’éviter des échecs d’ingestion et de maintenir l’intégrité des données :
Validation côté client : Implémentez la validation des schémas côté client pour obtenir un retour immédiat et réduire les erreurs côté serveur. Bien que la validation complète ligne par ligne offre une sécurité maximale, une méthode plus performante peut impliquer une validation sélective, par exemple aux limites des lots ou par échantillonnage des lignes.
Validation côté serveur : L’architecture hautes performances peut décharger la validation des schémas sur le serveur. Les erreurs et leur nombre sont signalés par le biais de
getChannelStatussi des discordances de schéma se produisent lors de l’ingestion dans le canal et la table cibles.
Maintenir l’état pour une récupération fiable¶
Pour éviter toute perte ou duplication de données, votre application doit maintenir son état pour gérer correctement les redémarrages et les échecs :
Persistance du jeton de décalage : Après chaque appel d’API réussi, conservez le
last_committed_offset_tokendans un stockage durable.Reprise depuis le dernier point : Lors du redémarrage de l’application, récupérez le dernier jeton validé de Snowflake et reprenez l’ingestion à partir de ce point précis. Cela permet de garantir un traitement unique et d’assurer la continuité.
Ajouter des colonnes de métadonnées côté client¶
Pour permettre une détection des erreurs et une récupération efficaces, vous devez conserver les métadonnées d’ingestion dans la charge utile de lignes. Cela nécessite de planifier à l’avance la structure de vos données et la définition PIPE.
Ajoutez les colonnes suivantes à votre charge utile de lignes avant l’ingestion :
CHANNEL_ID(Par exemple, un INTEGER compact.)STREAM_OFFSET(UnBIGINTqui augmente de manière monotone par canal, comme un décalage de partition Kafka).
Ensemble, ces colonnes identifient de manière unique les enregistrements par canal et vous permettent de tracer l’origine des données.
En option : Ajoutez une colonne PIPE_ID si plusieurs canaux intègrent la même table cible. En procédant ainsi, vous pouvez facilement retracer les lignes dans leur pipeline d’ingestion. Vous pouvez stocker des noms de canaux descriptifs dans une table de recherche distincte, en les mappant en entiers compacts afin de réduire les coûts de stockage.
Détecter les erreurs et y remédier à l’aide des décalages de métadonnées¶
Combinez la surveillance des canaux avec vos colonnes de métadonnées pour détecter les problèmes et y remédier :
Surveillance de l’état : Vérifiez régulièrement
getChannelStatus. Unrow_error_countcroissant est un indicateur fort d’un problème potentiel.Détection des enregistrements manquants : Si des erreurs sont détectées, utilisez une requête SQL pour identifier les enregistrements manquants ou obsolètes en vérifiant les écarts dans votre séquence
STREAM_OFFSET.
SELECT
PIPE_ID,
CHANNEL_ID,
STREAM_OFFSET,
LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) AS previous_offset,
(LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Optimiser les performances et les coûts d’ingestion avec MATCH_BY_COLUMN_NAME¶
Configurez votre canal pour mapper les colonnes nécessaires à partir de vos données sources au lieu d’ingérer toutes les données dans une seule colonne VARIANT. Pour ce faire, utilisez MATCH_BY_COLUMN_NAME = CASE_SENSITIVE ou appliquez des transformations dans la définition de votre canal. Cette bonne pratique optimise non seulement vos coûts d’ingestion, mais elle améliore également les performances globales de votre pipeline de données de flux.
Cette bonne pratique présente les avantages importants suivants :
En utilisant
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE, vous n’êtes facturé que pour les valeurs de données qui sont ingérées dans votre table cible. En revanche, l’ingestion de données dans une seule colonne VARIANT vous facture toutes les octets JSON, y compris les clés et les valeurs. Pour les données avec des clés JSON verbeuses ou nombreuses, cela peut entraîner une augmentation importante et inutile de vos coûts d’ingestion.Le moteur de traitement de Snowflake est plus efficace sur le plan des calculs. Au lieu d’analyser l’intégralité de l’objet JSON dans une VARIANT, puis d’extraire les colonnes requises, cette méthode extrait directement les valeurs nécessaires.