Best Practices für Snowpipe Streaming mit leistungsstarker Architektur

Dieser Leitfaden beschreibt die wichtigsten Best Practices für das Design und die Implementierung von robusten Datenaufnahme-Pipelines mit Snowpipe Streaming mit leistungsstarker Architektur. Indem Sie diese Best Practices befolgen, stellen Sie sicher, dass Ihre Pipelines dauerhaft und zuverlässig sind und über eine effiziente Fehlerbehandlung verfügen.

Strategische Kanalverwaltung

Wenden Sie die folgenden Strategien zur Kanalverwaltung an, um Leistung und langfristige Stabilität zu erzielen:

  • Langlebige Kanäle verwenden: Um den Overhead zu minimieren, öffnen Sie einen Kanal einmal, und halten ihn dann für die Dauer der Datenaufnahme-Aufgabe aktiv. Vermeiden Sie das wiederholte Öffnen und Schließen von Kanälen.

  • Deterministische Kanalnamen verwenden: Wenden Sie eine konsistente, vorhersehbare Namenskonvention an, z. B. source-env-region-client-id, um die Problembehandlung zu vereinfachen und automatisierte Wiederherstellungsprozesse zu unterstützen.

  • Mit mehrere Kanälen horizontal skalieren: Um den Durchsatz zu erhöhen, öffnen Sie mehrere Kanäle. Diese Kanäle können auf eine einzelne Ziel-Pipe oder auf mehrere Pipes verweisen, je nach Servicelimits und Ihren Durchsatzanforderungen.

  • Kanalstatus überwachen: Verwenden Sie die getChannelStatus-Methode regelmäßig, um den Zustand Ihrer Datenaufnahmekanäle zu überwachen.

    • Verfolgen Sie das last_committed_offset_token, um zu verifizieren, ob die Daten erfolgreich aufgenommen werden und die Pipeline voranschreitet.

    • Überwachen Sie die row_error_count, um fehlerhafte Datensätze oder andere Datenaufnahmeprobleme frühzeitig zu erkennen.

Schema konsistent validieren

Stellen Sie sicher, dass die eingehenden Daten mit dem erwarteten Tabellenschema übereinstimmen, um Aufnahmefehler zu vermeiden und die Datenintegrität aufrechtzuerhalten:

  • Clientseitige Validierung: Implementieren Sie die Schemavalidierung auf Clientseite, um sofortiges Feedback zu geben und serverseitige Fehler zu reduzieren. Auch wenn die vollständige zeilenweise Validierung maximale Sicherheit bietet, kann eine Methode, die leistungsfähiger ist, auch eine selektive Validierung beinhalten. z. B. bei Batchgrenzen oder durch das Sampling von Zeilen.

  • Serverseitige Validierung: Die leistungsstarke Architektur kann die Schemavalidierung auf den Server auslagern. Fehler sowie deren Anzahl werden über getChannelStatus gemeldet, wenn während der Aufnahme in die Ziel-Pipe und -tabelle Schemakonflikte auftreten.

Clientseitige Metadatenspalten hinzufügen

Um eine zuverlässige Fehlererkennung und Wiederherstellung zu ermöglichen, müssen Sie Datenaufnahme-Metadaten als Teil der Zeilennutzlast mit sich führen. Dies erfordert eine vorherige Planung Ihrer Datenform und PIPE-Definition.

Fügen Sie vor der Datenaufnahme die folgenden Spalten zu Ihrer Zeilennutzlast hinzu:

  • CHANNEL_ID (z. B. ein kompaktes INTEGER.)

  • STREAM_OFFSET (ein BIGINT, das pro Kanal monoton ansteigt, z. B. ein Kafka-Partitions-Offset).

Zusammen identifizieren diese Spalten Datensätze pro Kanal eindeutig, sodass Sie die Herkunft der Daten verfolgen können.

Optional: Fügen Sie eine PIPE_ID-Spalte hinzu, wenn mehrere Pipes Daten in dieselbe Zieltabelle aufnehmen. Sie können dann Zeilen ganz einfach bis zu ihrer Erfassungspipeline zurückverfolgen. Sie können beschreibende Pipe-Namen in einer separaten Suchtabelle speichern und sie kompakten Ganzzahlen zuordnen, um die Speicherkosten zu senken.

Metadaten-Offsets zum Erkennen und Wiederherstellen von Fehlern verwenden

Kombinieren Sie die Kanalüberwachung mit Ihren Metadatenspalten, um Probleme zu erkennen und zu beheben:

  • Status überwachen: Überprüfen Sie regelmäßig den getChannelStatus. Eine steigende row_error_count ist ein starker Indikator für ein potenzielles Problem.

  • Fehlende Datensätze erkennen: Wenn Fehler festgestellt werden, verwenden Sie eine SQL-Abfrage, um fehlende oder fehlerhafte Datensätze zu identifizieren, indem Sie eine Prüfung auf Lücken in Ihrer STREAM_OFFSET-Sequenz durchführen.

SELECT
  PIPE_ID,
  CHANNEL_ID,
  STREAM_OFFSET,
  LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) AS previous_offset,
  (LAG(STREAM_OFFSET) OVER (
    PARTITION BY PIPE_ID, CHANNEL_ID
    ORDER BY STREAM_OFFSET
  ) + 1) AS expected_next
FROM my_table
QUALIFY STREAM_OFFSET != previous_offset + 1;
Copy

Datenaufnahmeleistung und -kosten mit MATCH_BY_COLUMN_NAME optimieren

Konfigurieren Sie Ihre Pipeline so, dass die erforderlichen Spalten aus Ihren Quelldaten zugeordnet werden, anstatt alle Daten in eine einzige VARIANT-Spalte zu übernehmen. Verwenden Sie dazu MATCH_BY_COLUMN_NAME = CASE_SENSITIVE, oder wenden Sie Transformationen in Ihrer Pipe-Definition an. Diese bewährte Methode optimiert nicht nur Ihre Datenaufnahme-Kosten, sondern verbessert auch die Gesamtleistung Ihrer Streaming-Datenpipeline.

Diese bewährte Methode hat die folgenden wichtigen Vorteile:

  • Bei Verwendung von MATCH_BY_COLUMN_NAME = CASE_SENSITIVE werden Ihnen nur die Datenwerte in Rechnung gestellt, die in Ihre Zieltabelle aufgenommen werden. Werden Daten hingegen in eine einzige VARIANT-Spalte aufgenommen, wird die gesamte JSON-Bytezahl abgerechnet, einschließlich Schlüssel und Werte. Bei Daten mit ausführlichen oder zahlreichen JSON-Schlüsseln kann dies zu einer erheblichen und unnötigen Erhöhung Ihrer Datenaufnahmekosten führen.

  • Die Verarbeitungs-Engine von Snowflake ist recheneffizienter. Anstatt das gesamte JSON-Objekt in eine VARIANT zu parsen und dann die erforderlichen Spalten zu extrahieren, extrahiert diese Methode die erforderlichen Werte direkt.

Get Prometheus metrics

To get performance metrics from the Snowpipe Streaming high-performance client, you must enable the built-in Prometheus metrics server and configure your Prometheus service to scrape the endpoint.

Enable the metrics server by setting the environment variable SS_ENABLE_METRICS to true before running your application.

Scrape the metrics endpoint on the host that is running your Snowpipe Streaming ingest process. The default path is /metrics on the host and port defined by SS_METRICS_IP and SS_METRICS_PORT.

Example: Verifying the metrics endpoint (local process/dev box)

# Enable Prometheus metrics
export SS_ENABLE_METRICS=true
# Run your application (the server starts on 127.0.0.1:50000 by default)

# Curl the endpoint to verify the metrics are exposed
curl http://127.0.0.1:50000/metrics
Copy

Example: Prometheus scrape configuration

Point your Prometheus service at the host running the Snowpipe Streaming client.

scrape_configs:
  - job_name: snowpipe_streaming_hp
    metrics_path: /metrics   # default is /metrics
    static_configs:
      - targets: ['127.0.0.1:50000']
Copy