qimia

Aufbau einer Azure Data Factory Pipeline für professionelles Data Warehousing - 4/4

Startseite
page
Insights
page
Blogs
page
Aufbau einer Azure...

Data Science

24 min. lesen

January 26, 2023

Aufbau einer Azure Data Factory Pipeline für professionelles Data Warehousing - 4/4

Willkommen zurück zu unserer Serie über Data Engineering auf MS Azure. In diesem Artikel beschreiben wir den Aufbau einer Azure Data Factory Pipeline, die Daten für ein Data Warehouse aufbereitet, das für Business Analytics verwendet werden soll. In den vorangegangenen Blogartikeln haben wir gezeigt, wie Sie die Infrastruktur mit DataEngineeringauf Azure - Die Einrichtung einrichten.

Wie man vorverarbeiten einige Daten mit der Datenfabrik und welche Struktur wir wählen für das Data Warehouse . Dieser Artikel befasst sich mit technischen Aspekten und Best Practices des Data Factory-Datenflusses, z.B. Lesen von (partitionierten) Daten, Hashing, Mapping, Joining, Filterung und Triggering.

1. Der Grundgedanke

Die Grundidee unserer ETL-Pipeline besteht darin, die aktuellen Dimensionstabellen monatlich zu laden, ihnen einen technischen Schlüssel hinzuzufügen, sie in Azure Data Lake Storage (ADLS) zu speichern und sie von ADLS in das DWH zu kopieren.

Dementsprechend erstellen wir auch monatliche Aggregate in Faktentabellen, die über Fremdschlüssel mit Dimensionstabellen verbunden sind. Dann dumpen wir auch diese Dimensionstabellen und kopieren sie in das DWH.

2. Die Pipeline im Überblick

Für die gesamte Datenverarbeitung haben wir eine Hauptpipeline (dwhPL) eingeführt, die einen Datenfluss aufruft, um alle Daten zu aggregieren und sie in ADLS zu dumpen. Die Pipeline ruft anschließend eine andere Pipeline auf, die alle gedumpten Daten in das Synapse DWH kopiert.

Als letzten Schritt löschen wir alle aggregierten und gedumpten Daten aus dem ADLS, um unsere Speicherkapazität zu reduzieren und damit Geld zu sparen. Eine andere Möglichkeit, dies zu tun, wäre die Verwendung von Lebenszyklusverwaltungsrichtlinien für unser Speicherkonto. Die Details dieser Kopier-/Lösch-Pipelines werden hier nicht behandelt.

Wir setzen einen Pipeline-Parameter "date" vom Typ string, den wir mit einem beliebigen Standardwert wie '1997-12-31' initialisieren. Der Zweck dieses Parameters ist die Übergabe eines Trigger-Parameters, nämlich des Zeitstempels der geplanten Ausführung. Dieser Parameter wird bei der Aggregation verwendet, bei der wir nur Daten aus diesem bestimmten Monat lesen. Auf die Einzelheiten der Übergabe des Trigger-Parameters werden wir später in diesem Artikel zurückkommen.

3. Die Aggregation

Die Aggregation der Daten erfolgt in dem zuvor erwähnten Datenfluss. Sie finden den gesamten Datenfluss JSON in unserem Repo. In diesem Abschnitt wird nicht auf jeden einzelnen Schritt des Datenflusses eingegangen, sondern es wird ein Überblick gegeben und es werden einige allgemeine Techniken erwähnt. Eine wichtige Eigenschaft unseres Datenflusses sind die Parameter, die wir zur Steuerung der Datenflussausführung verwenden. Da wir die Daten in monatlichen Stapeln lesen müssen, haben wir die Parameter month_p und year_p als Ganzzahlen hinzugefügt, die aus dem Datumsparameter unserer Pipeline abgeleitet werden.

3.1. Die Dimensionstabellen

Es gibt zwei allgemeine Arten von Tabellen in unserem DWH-Schema: die Dimensionstabellen und die Faktentabellen. In unserem Fall sind die Dimensionstabellen einfacher zu erstellen, da sie nur die eingehenden Informationen zusammen mit einem Schlüssel und einem Zeitstempel enthalten.

Hier verwenden wir die folgenden Elemente zur Erstellung der Dimensionstabellen:

  • Lesen Sie die Daten aus dem Speicher

  • Mappen Sie jede Spalte (mit dem richtigen Typ)

  • Wählen Sie die benötigten Spalten aus

  • Fügen Sie einen technischen Schlüssel und einen Zeitstempel hinzu

  • Teilen Sie den Stream:

a) Wählen Sie ID und Schlüssel aus, um sie später mit Faktentabellen zu verbinden. b) Schreiben Sie sie in den Speicher.

3.1.1. Daten lesen

Unsere Datenquellen sind Parquet-Dateien. Wir geben einen Platzhalterpfad zu unserer Parkettdatei an, da wir alle Monatsdaten aus dem Jahr und dem Monat lesen möchten, die wir im aktuellen Lauf verarbeiten.

Es ist auch möglich, mehr als einen Pfad hinzuzufügen. Nach jeder Parkettquelle fügen wir ein Mapping hinzu. Dies ist eine bewährte Methode, um sicherzustellen, dass ein bestimmtes Schema aufgenommen wird. Sie verlieren damit zwar ein wenig an Flexibilität, aber Sie gewinnen an Kontrolle und Zuverlässigkeit. Data factory bietet ein automatisches Mapping, das Sie verwenden sollten, um nicht jede Spalte manuell eingeben zu müssen. Gehen Sie dazu auf die Registerkarte Datenvorschau und klicken Sie auf den Button "Drifted mappen" (dies ist nur möglich, wenn Sie die Option Schemadrift zulassen wählen).

3.1.2. Spalten auswählen

Es empfiehlt sich, so früh wie möglich im Stream einige Spalten auszuwählen, die Sie benötigen. Darüber hinaus empfiehlt es sich, als letzten Schritt vor dem Absenken Ihrer Daten eine weitere Auswahlphase durchzuführen. Dadurch erhalten Sie mehr Flexibilität für Fälle, in denen die Spaltennamen geändert werden oder genau dieselben Daten mit leicht veränderter Benennung an einer anderen Stelle im Data Flow wiederverwendet werden.

Im Fall unserer Dimensionsverarbeitung haben wir sehr kurze Datenströme. Es würde keinen Sinn machen, eine Auswahl am Anfang und eine am Ende zu haben. Daher haben wir nur eine.

3.1.3. Hinzufügen von Spalten

Es empfiehlt sich, Ihre Daten mit einer Art Zeitstempel zu versehen. Sie können entweder einen "insert_ts" oder einen "created_ts" erstellen. Je nachdem, wofür Sie sich entscheiden, würden Sie den Inhalt dieser Spalte entweder in Ihrem Ziel-DWH beim Einfügen oder in der Data Factory bei der Erstellung der Daten erstellen.

Wir haben uns dafür entschieden, ihn während des ETL-Prozesses zu erstellen. Außerdem empfiehlt es sich, in den Dimensionstabellen einen technischen Schlüssel zu haben, der später für die Verknüpfung von Faktentabellen verwendet werden kann und eine ausgewogenere Verteilung gewährleistet, wenn Sie Ihre Daten im DWH auf diesen Schlüssel verteilen.

In diesem Fall erstellen wir den Schlüssel im Dimensionsstrom und verknüpfen diesen Schlüssel später mit den Faktentabellen. Wir erstellen den Schlüssel mit der Hash-Funktion crc32 und verwenden den currentTimestamp für die created_ts.

3.1.4. Aufspaltung und Versenkung

Hier teilen wir den Stream mit der Option "Neuer Zweig" auf. Einer der Zweige wird verwendet, um nur den zuvor definierten Schlüssel und die ID auszuwählen. Wir verwenden diesen Zweig, um den Schlüssel später mit den Faktentabellen zu verknüpfen. Der zweite Zweig schreibt die Daten als Parkettdateien in den Speicher.

3.2. Die Faktentabellen

Der Zweck der Faktentabellen ist grundsätzlich die Aggregation von Informationen, die für Data Analytics im Data Warehouse verwendet werden. Außerdem benötigt jede Faktentabelle die Schlüssel aus den zugehörigen Dimensionstabellen. Daher benötigen wir neben der allgemeinen Streaming-Struktur zwei weitere Bestandteile:

  • Zusammenführen von aggregierten Werten

  • Zusammenfügen von technischen Schlüsseln für die Referenzierung auf Dimensionen

Der Rest ist ähnlich (Lesen, Mappen, Auswählen, Schlüssel und Zeitstempel hinzufügen, Schreiben).

3.2.1. Technische Schlüssel verknüpfen

Hier fügen wir den technischen Schlüssel aus der Dimensionstabelle in die Faktentabelle ein, indem wir ihn mit der entsprechenden ID verknüpfen. Eine andere Möglichkeit wäre, den Hash mit der gleichen Hash-Funktion neu zu erstellen. Allerdings müsste man dann die gesamten Daten, die für das Hashing benötigt werden, zusammenführen, was zu einer geringeren Leistung führt. Für den Join-Typ verwenden wir den inneren Join. In unserem Fall spielt es keine Rolle, ob es sich um einen Left Join oder einen Inner Join handelt. Allerdings könnte der innere Join die fehlertolerantere Lösung sein. Man könnte sich den Fall vorstellen, dass in einer Dimensionstabelle versehentlich einige Einträge fehlen, was zu späteren Fehlern führen könnte. Diese Fälle würden durch den inneren Join herausgefiltert werden.

3.2.2. Verknüpfung der aggregierten Daten

Diese Schritte bringen die gesamte Geschäftslogik der Faktentabellen ein. Die Verknüpfung selbst ist jedoch trivial. Sie müssen sich nur für einen Left Join oder einen Inner Join entscheiden, was wiederum von Ihren geschäftlichen Anforderungen und der Art der anschließenden Datenverarbeitung abhängt. Wir entscheiden uns für die innere Verknüpfung. Die Aggregation selbst erfolgt in separaten Streams, je nachdem, welche Daten Sie aggregieren möchten.

In der Regel folgen sie den gleichen Prinzipien: Lesen, Mappen, Auswählen, aber der letzte Schritt ist die Aggregation. Wenn Sie Daten für verschiedene Faktentabellen aggregieren müssen, müssen Sie Ihre Daten so oft verzweigen, wie Sie Spalten haben, nach denen Sie gruppieren können. In unserem Fall möchten wir die Details einer Bestellung aggregieren, aber wir brauchen sie für zwei Faktentabellen, nämlich:

  • Wie viele Stücke wurden in einer bestimmten Reihenfolge bestellt (Gruppierung nach Reihenfolge)?

  • Wie viele Stücke wurden für ein bestimmtes Produkt bestellt (Gruppe nach Produkt)?

3.3. Der Mitarbeiterstrom (Mischung aus Dim und Fact)

In unserem Beispiel mag der Datenstrom des Mitarbeiters furchtbar chaotisch erscheinen. Der Grund dafür ist, dass wir daraus vier Tabellen erstellen. Natürlich könnte man auch vier Quellen haben, aber das würde vier Lesevorgänge erfordern und könnte uns Leistung kosten.

Hier erstellen wir die Dimension Employee sowie die Fact Employee, Fact Employee Monthly und Fact Supervisor in einem Stream. Die Prinzipien sind jedoch alle dieselben: Lesen, Mappen, Auswählen, Aufteilen und Hinzufügen von Schlüsseln und Zeitstempeln. Für die Fakten müssen wir auch die aggregierten Daten zusammenführen. Das war's schon. Es gibt nur zwei Spezialitäten:

Fakt Supervisor: Diese Tabelle ist eine Teilstichprobe der Mitarbeiter, weshalb wir sie filtern müssen. Außerdem erstellt diese Faktentabelle ihre eigenen Aggregate. Wir verbinden keine anderen Aggregate, sondern aggregieren über die Spalte "ReportsTo". Aber genau diese Aggregation übernimmt auch den größten Teil der Filterung. Der Filter lässt nur die Nullwerte weg. Sie müssen den "!"-Operator (!isNull()) verwenden, da es keine isNotNull()-Funktion gibt.

2. Fact Employee und Employee Monthly: Hier können wir nicht einfach alle aggregierten Eigenschaften zusammenführen. Stattdessen müssen wir teilweise aggregierte Daten zusammenführen und diese aggregieren. Wir benötigen zum Beispiel die eindeutige Anzahl der ProductIDs pro Mitarbeiter. Solche eindeutigen Zählungen können in ihren Ursprungstabellen nicht aggregiert werden. Außerdem müssen wir das Aggregat der Bestellungen mit einem inneren Join verknüpfen, der den Filtermechanismus dafür schafft, ob es sich nur um die Daten des letzten Monats oder um den gesamten Datensatz handelt. Dies ist jedoch nicht möglich, wenn Sie die monatliche Aggregation nach einem anderen Zeitplan als die vollständige Aggregation ausführen möchten. In diesem Fall bräuchten Sie komplett getrennte Pipelines.

3.4. Die Aggregate der aufgeteilten Aufträge

Der letzte Punkt, den wir behandeln müssen, ist das Lesen von partitionierten Daten. Hier lesen wir die Bestellungen, die nach Jahr, Monat und Tag partitioniert sind. Wir benötigen sie für vollständige Aggregate und monatliche Aggregate.

Daher lesen wir entweder jede Teilung oder nur das jeweilige Jahr und den Monat. Es gibt zwei Möglichkeiten, dies zu tun. Entweder lesen Sie die Daten nur einmal, teilen den Datenstrom auf und filtern einen der Zweige nach dem gewünschten Jahr und Monat. Oder Sie lesen die Daten zweimal, einmal mit allen Daten und einmal mit nur einer Partition. Für die Leistung würde das nicht viel ausmachen, da die Daten ohnehin darauf partitioniert sind.

Hier entscheiden wir uns dafür, Daten in zwei verschiedenen Streams zu lesen. Die Art und Weise, wie Sie partitionierte Parkettdaten behandeln, besteht darin, einen Wildcard-Pfad und einen Partitionsstamm-Pfad einzurichten. In unserem Fall müssen wir nur die Parameter Jahr und Monat zum Platzhalterpfad hinzufügen.

Das ist alles, was für die Aggregation benötigt wird. Es gibt jedoch noch einen letzten Abschnitt über die Auslösung der Pipeline.

4. Der Auslöser

Die Verwendung von Triggern ist eine Möglichkeit, die Ausführung Ihrer Arbeitsabläufe zu verwalten und zu planen. Allerdings ist das Triggern in der Data Factory noch nicht ganz ausgereift. Es stehen drei verschiedene Arten von Triggern zur Auswahl: Ereignis, geplant und Tumbling Window. Der Ereignisauslöser kann ausgelöst werden, wenn ein Blob in einem bestimmten Container erstellt oder gelöscht wird, was wir in unserem Fall nicht wollen.

4.1. Geplanter Auslöser

Der geplante Trigger ist eine gute Wahl, da Sie eine klare Häufigkeit (in Minuten, Stunden, Tagen, Wochen, Monaten) festlegen können, in der Sie Ihre Pipeline ausführen möchten. Sie können die Startzeit des Triggers über die Systemvariable "@trigger().scheduledTime" an die Pipeline übergeben.

Und es ist auch möglich, den Auslöser fein abzustimmen, indem Sie einen bestimmten Tag des Monats oder einen Wochentag festlegen. Es scheint also eine perfekte Wahl zu sein. Allerdings funktioniert der geplante Auslöser nicht für alle Backfilling-Szenarien. Wenn Sie ein Datum eingeben, das in der Vergangenheit liegt, wird die Pipeline nicht ausgeführt. Da unser Spielzeugdatensatz Daten aus den 90er Jahren enthält, können wir diese Daten nicht direkt mit dem geplanten Trigger abrufen. Im Gegensatz dazu ist der Tumbling Window Trigger in der Lage, Backfilling-Szenarien zu verarbeiten.

4.2. Taumelnder Fensterauslöser

Neben der Handhabung von Backfilling-Szenarien verfügt der Tumbling Window-Trigger über verschiedene andere Optionen wie Abhängigkeiten von anderen Triggern (auch Selbstabhängigkeit) oder Wiederholungsrichtlinien.

Sie können die Startzeit des Triggers über die Systemvariable "@trigger().outputs.windowStartTime" an die Pipeline übergeben. Leider ist die Auswahl der Häufigkeit sehr begrenzt, da Sie nur zwischen Minuten und Stunden wählen können. Außerdem ist es nicht möglich, den Trigger an jedem ersten oder letzten Tag eines Monats auszulösen oder ihn an mehreren bestimmten Wochentagen auszuführen. Daher müsste ein solches Szenario mit einer Umgehungslösung gelöst werden, z.B. indem Sie die Funktion alle 24 Stunden auslösen, aber eine If-Condition-Aktivität hinzufügen, die auswertet, ob es der letzte Tag eines Monats ist.

Einpacken

Wir haben eine detaillierte Lösung für ein typisches ETL-Verfahren mit dem Azure-Toolset vorgestellt. Wir begannen mit der Erstellung von Ressourcen und der Vorverarbeitung von Daten. Anschließend haben wir die Details eines bestimmten Data Warehouse-Schemas auf Azure Synapse erläutert.

Als letzte Zutat haben wir ein detailliertes ETL-Verfahren mit dem Datenfluss von Data Factory gezeigt. Wenn Ihnen dieser Blog gefallen hat, bleiben Sie dran. Das nächste Mal werden wir über die Data Analytics mit PowerBI berichten. Prost!

Quellen

109

Anteile

160

Ansichten

Verwandte Blogs

In Kontakt kommen
Erweitern Sie Ihr Geschäft mit Qimia