Datenfluss mit Openflow erstellen¶
Unter diesem Thema wird der Prozess zum Erstellen eines Datenflusses in Openflow beschrieben.
Voraussetzungen¶
Prozedur¶
Sobald Sie Ihre Laufzeitumgebung eingerichtet haben, erstellen wir eine einfache Datenpipeline. Als Beispiel generieren wir Datensätze auf der Grundlage eines bestimmten Schemas, filtern diese Datensätze auf der Grundlage einer SQL-Abfrage und senden die Daten dann an Snowflake.
Eine detaillierte Beschreibung zur Erstellung von Datenflüssen finden Sie in der Dokumentation Apache NiFi.
Öffnen Sie die Openflow-Anwendung. Der große Rasterbereich, der wahrscheinlich leer ist, wird als Canvas bezeichnet und beherbergt die Komponenten, die Sie zur Implementierung Ihres Datenflusses erstellen werden.
Erstellen Sie eine Prozessgruppe. Ziehen Sie das Symbol „Prozessgruppe“ aus der Werkzeugpalette oben auf der Seite auf das Canvas. Sobald Sie den Mauszeiger loslassen, erscheint ein Popup-Fenster Prozessgruppe erstellen.
Geben Sie einen Namen für Ihren Datenfluss ein, z. B. „Flow Example“, und klicken Sie auf Add.
Optional: Klicken Sie mit der rechten Maustaste auf die Prozessgruppe, die Sie gerade erstellt haben, und wählen Sie im Kontextmenü Enter Group `. Optional können Sie auch auf die Prozessgruppe doppelklicken. Dadurch entsteht eine visuelle Abstraktion, die sich vom obersten Ebene des Canvas abhebt.
Fügen Sie einen Prozessor hinzu. Um einen Prozessor hinzuzufügen, wählen Sie das Tool Processor aus, ziehen es auf das Canvas und lassen die Maustaste los.
Das Dialogfenster Add Processor wird geöffnet.
Wählen Sie den Prozessor GenerateRecord ` aus der Liste, und klicken Sie auf Add.
Auf dem Canvas sehen Sie nun einen neu hinzugefügten Prozessor.
Bemerkung
Sie können mehrere Prozessoren hinzufügen.
Fügen Sie die folgenden Prozessoren hinzu. Sie werden in späteren Schritten konfiguriert:
QueryRecord
PutDatabaseRecord
Konfigurieren Sie die Prozessoren.
Doppelklicken Sie auf einen Prozessor. Das Dialogfenster Edit Processor wird geöffnet.
Ändern Sie die folgenden Eigenschaften:
Einstellungen
Zeitplan
Eigenschaften
Beziehungen:
Kommentare
Erstellen Sie Verbindungen zwischen den Prozessoren.
Bewegen Sie den Mauszeiger über den ersten Prozessor. In der Mitte des Prozessors erscheint ein Kreis mit einem Pfeil darin.
Klicken Sie auf den Kreis mit dem Pfeil, und ziehen Sie den Zeiger in Richtung des zweiten Prozessors. Dadurch wird eine rote gepunktete Linie erzeugt, die anzeigt, dass die Verbindung nicht hergestellt werden kann.
Bewegen Sie das Sprite über den zweiten Prozessor.
Die gepunktete Linie wird grün und ein grüner Rahmen erscheint um den Zielprozessor.
Lassen Sie die Maustaste los. Das Popup-Fenster Create Connection erscheint.
Notieren Sie sich die Namen From Processor und To Processor. Wählen Sie den Abschnitt :ui: „Relationships“
, markieren Sie :ui:`Success
.Klicken Sie auf Add. Die neue Verbindung wird erstellt.
Die Verbindung wird durch eine Warteschlange von FlowFiles unterstützt, in der diese gespeichert werden, bis der nächste Prozessor ausgelöst wird und sie verarbeitet.
Fügen Sie den Controller-Dienst SnowflakeConnectionService zum Ablauf hinzu.
Bearbeiten Sie den Controller-Dienst, und füllen Sie die erforderlichen Felder aus.
Melden Sie sich bei Ihrem Snowflake-Konto an und erstellen Sie eine Datenbank.
Erstellen Sie im PUBLIC-Schema der Datenbank eine Standardtabelle.
create table SAMPLE_DATA (
name STRING,
country STRING
)
Führen Sie den Flow in Openflow aus.
Abfragen auf den Daten ausführen