Bonnes pratiques pour Snowpipe Streaming avec architecture hautes performances¶
Ce guide présente les bonnes pratiques clés pour concevoir et mettre en œuvre des pipelines d’ingestion de données performants à l’aide de Snowpipe Streaming avec architecture hautes performances. En suivant ces bonnes pratiques, vous vous assurez que vos pipelines sont durables, fiables et dotés d’un traitement des erreurs efficace.
Gérer les canaux de manière stratégique¶
Appliquez les stratégies de gestion des canaux suivantes pour garantir les performances et la stabilité à long terme :
Utilisation de canaux de longue durée : Pour minimiser la charge, ouvrez un canal une seule fois, puis laissez-le actif pendant toute la durée de la tâche d’ingestion. Évitez d’ouvrir et de fermer des canaux de manière répétée.
Utilisation de noms de canaux déterministes : Appliquez une convention de dénomination cohérente et prévisible (par exemple,
source-env-region-client-id) pour simplifier le dépannage et faciliter les processus de récupération automatisés.Mise à l’échelle avec plusieurs canaux : Pour augmenter le débit, ouvrez plusieurs canaux. Ces canaux peuvent pointer vers un seul canal cible ou vers plusieurs canaux, en fonction des limites de service et de vos exigences de débit.
Surveillance de l’état des canaux : Utilisez régulièrement la méthode
getChannelStatuspour surveiller la santé de vos canaux d’ingestion.Suivez le
last_committed_offset_tokenpour vérifier que les données sont ingérées correctement et que le pipeline progresse.Surveillez le
row_error_countpour détecter rapidement les enregistrements incorrects ou d’autres problèmes d’ingestion.
Valider le schéma de manière cohérente¶
Assurez-vous que les données entrantes sont conformes au schéma de table attendu afin d’éviter des échecs d’ingestion et de maintenir l’intégrité des données :
Validation côté client : Implémentez la validation des schémas côté client pour obtenir un retour immédiat et réduire les erreurs côté serveur. Bien que la validation complète ligne par ligne offre une sécurité maximale, une méthode plus performante peut impliquer une validation sélective, par exemple aux limites des lots ou par échantillonnage des lignes.
Validation côté serveur : L’architecture hautes performances peut décharger la validation des schémas sur le serveur. Les erreurs et leur nombre sont signalés par le biais de
getChannelStatussi des discordances de schéma se produisent lors de l’ingestion dans le canal et la table cibles.
Ajouter des colonnes de métadonnées côté client¶
Pour permettre une détection des erreurs et une récupération efficaces, vous devez conserver les métadonnées d’ingestion dans la charge utile de lignes. Cela nécessite de planifier à l’avance la structure de vos données et la définition PIPE.
Ajoutez les colonnes suivantes à votre charge utile de lignes avant l’ingestion :
CHANNEL_ID(Par exemple, un INTEGER compact.)STREAM_OFFSET(UnBIGINTqui augmente de manière monotone par canal, comme un décalage de partition Kafka).
Ensemble, ces colonnes identifient de manière unique les enregistrements par canal et vous permettent de tracer l’origine des données.
En option : Ajoutez une colonne PIPE_ID si plusieurs canaux intègrent la même table cible. En procédant ainsi, vous pouvez facilement retracer les lignes dans leur pipeline d’ingestion. Vous pouvez stocker des noms de canaux descriptifs dans une table de recherche distincte, en les mappant en entiers compacts afin de réduire les coûts de stockage.
Détecter les erreurs et y remédier à l’aide des décalages de métadonnées¶
Combinez la surveillance des canaux avec vos colonnes de métadonnées pour détecter les problèmes et y remédier :
Surveillance de l’état : Vérifiez régulièrement
getChannelStatus. Unrow_error_countcroissant est un indicateur fort d’un problème potentiel.Détection des enregistrements manquants : Si des erreurs sont détectées, utilisez une requête SQL pour identifier les enregistrements manquants ou obsolètes en vérifiant les écarts dans votre séquence
STREAM_OFFSET.
SELECT
PIPE_ID,
CHANNEL_ID,
STREAM_OFFSET,
LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) AS previous_offset,
(LAG(STREAM_OFFSET) OVER (
PARTITION BY PIPE_ID, CHANNEL_ID
ORDER BY STREAM_OFFSET
) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Optimiser les performances et les coûts d’ingestion avec MATCH_BY_COLUMN_NAME¶
Configurez votre canal pour mapper les colonnes nécessaires à partir de vos données sources au lieu d’ingérer toutes les données dans une seule colonne VARIANT. Pour ce faire, utilisez MATCH_BY_COLUMN_NAME = CASE_SENSITIVE ou appliquez des transformations dans la définition de votre canal. Cette bonne pratique optimise non seulement vos coûts d’ingestion, mais elle améliore également les performances globales de votre pipeline de données de flux.
Cette bonne pratique présente les avantages importants suivants :
En utilisant
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE, vous n’êtes facturé que pour les valeurs de données qui sont ingérées dans votre table cible. En revanche, l’ingestion de données dans une seule colonne VARIANT vous facture toutes les octets JSON, y compris les clés et les valeurs. Pour les données avec des clés JSON verbeuses ou nombreuses, cela peut entraîner une augmentation importante et inutile de vos coûts d’ingestion.Le moteur de traitement de Snowflake est plus efficace sur le plan des calculs. Au lieu d’analyser l’intégralité de l’objet JSON dans une VARIANT, puis d’extraire les colonnes requises, cette méthode extrait directement les valeurs nécessaires.
Get Prometheus metrics¶
To get performance metrics from the Snowpipe Streaming high-performance client, you must enable the built-in Prometheus metrics server and configure your Prometheus service to scrape the endpoint.
Enable the metrics server by setting the environment variable SS_ENABLE_METRICS to true before running your application.
Scrape the metrics endpoint on the host that is running your Snowpipe Streaming ingest process. The default path is /metrics on the host and port defined by SS_METRICS_IP and SS_METRICS_PORT.
Example: Verifying the metrics endpoint (local process/dev box)¶
# Enable Prometheus metrics
export SS_ENABLE_METRICS=true
# Run your application (the server starts on 127.0.0.1:50000 by default)
# Curl the endpoint to verify the metrics are exposed
curl http://127.0.0.1:50000/metrics
Example: Prometheus scrape configuration¶
Point your Prometheus service at the host running the Snowpipe Streaming client.
scrape_configs:
- job_name: snowpipe_streaming_hp
metrics_path: /metrics # default is /metrics
static_configs:
- targets: ['127.0.0.1:50000']