Data Science
January 26, 2023

Einführung
Willkommen zu unserer Serie über die Entwicklung von Streaming-Anwendungen.
In den letzten zehn Jahren wurden zahlreiche neue Anwendungen entwickelt, die Daten in einem Strom statt in Stapeln verarbeiten mussten. Der Grund dafür ist einfach. Auf diese Weise bleiben die Daten immer aktuell. Infolgedessen wurden viele Technologien entwickelt, die sich mit der verteilten und gleichzeitigen Ausführung von Streaming-Operationen befassen.
In dieser Serie lernen Sie einige dieser Technologien anhand eines bekannten Produkts aus der realen Welt kennen, nämlich der Autovermietung. Die Serie ist in die folgenden Tutorials unterteilt:
Simulation eines Netzwerks von Autos, die gebucht und bewegt werden, mit Akka classic
Verwendung eines Message Brokers (Kafka) zur Übertragung von Ereignisdaten an einen Message Consumer
Analysieren Sie die Daten, indem Sie Spark Structured Streaming zur Durchführung von Analysen nutzen.
Den Code, auf den in diesem Artikel verwiesen wird, finden Sie in unserem Github.
1. AKKA
Akka ist ein Toolset für die Entwicklung nebenläufiger und verteilter nachrichtengesteuerter Anwendungen. Im Wesentlichen ermöglicht Akka die einfache Implementierung des Akteur-Musters in Anwendungen. Alles in Akka ist um Akteure herum aufgebaut.
Haftungsausschluss: Bitte beachten Sie, dass wir in diesem Tutorial Akka classic verwenden. Wenn Sie neue Anwendungen entwickeln, sollten Sie die neuere Version namens Akka typed verwenden, die Typsicherheit für die gesendeten Nachrichten bietet. Unter diesem Link finden Sie weitere Informationen über die Unterschiede.
1.1. Wer sind die Akteure?
Akteure sind im Wesentlichen zustandsabhängige Maschinen, die sowohl den Zustand als auch das Verhalten kapseln. Der Zustand und das Verhalten von Akteuren können nur von innerhalb der Akteure geändert werden. Die gesamte Interaktion zwischen den Akteuren erfolgt in Form von unveränderlichen Nachrichten. Um eine Nachricht an einen anderen Akteur zu senden, verwenden die Akteure Akteursreferenzobjekte. Die Nachrichten, die an einen Akteur gesendet werden, kommen im Posteingang dieses Akteurs an. Wenn ein Akteur eine Nachricht erhält, führt er weitere Berechnungen durch und antwortet möglicherweise dem Absender der Nachricht.
In Akka Typed verarbeiten Actors in der Regel verschiedene Nachrichtentypen, die alle direkt oder indirekt eine gemeinsame Eigenschaft erweitern. Actors können nicht nur ihren Zustand, sondern auch ihr Verhalten ändern. In Akka Classic geschieht dies mit Hilfe der Methode become(). In Akka Typed geben Actors am Ende ihrer receive-Methode ein Behavior zurück. In Fällen, in denen sich das Verhalten nicht ändert, wird die Funktion einfach Behaviors.same zurückgeben.
1.2. Wie sind die Akteure organisiert?
Akteure sind in Hierarchien organisiert, die in Akka als Akteurssysteme bezeichnet werden. Jeder Akteur hat einen übergeordneten Akteur (mit Ausnahme des Wurzelakteurs) und jeder Akteur kann mehrere untergeordnete Akteure haben, die im Wesentlichen eine Baumstruktur bilden.
Innerhalb der Hierarchie sollte es immer das Ziel sein, Arbeit von oben nach unten zu delegieren, so dass die Threads der koordinierenden Akteure frei von blockierenden Operationen sind.
Das Akteurssystem in Akka wird auch zur Erkennung von Akteuren verwendet. Sowohl absolute als auch relative Pfade können verwendet werden, um die Referenz eines Akteurs über seinen Pfad in der Hierarchie zu erhalten.
Quelle: https://doc.akka.io/docs/akka/current/general/addressing.html
1.3. Lebenszyklus-Management
Akteure können ausfallen, und es ist wichtig, sich Gedanken darüber zu machen, wie man mit ihrem Ausfall umgeht. Glücklicherweise bietet Akka bereits einige Werkzeuge, um das Verhalten im Falle eines Fehlers zu steuern. In Akka Classic wird der Akteur standardmäßig neu gestartet. Wenn innerhalb eines Akteurs eine Ausnahme auftritt, setzt er sich selbst und alle seine Kinder aus und sendet eine Nachricht an seinen Elternteil (Supervisor). Der Supervisor kann dann entscheiden, wie der Fehler behandelt werden soll, insbesondere was mit dem Zustand des fehlgeschlagenen Akteurs geschehen soll. In Akka Typed hingegen wird der Akteur angehalten, wenn eine Ausnahme auftritt. Außerdem wird SupervisorStrategy.Escalate in Typed nicht unterstützt.
In Akka Classic können Akteure auch aktiv gestoppt werden, indem Sie eine der folgenden Möglichkeiten nutzen:
Verwendung der Stopp-Methode
Versenden einer PoisonPill-Nachricht
Verwendung der Graceful-Stop-Methode
Je nach gewählter Methode werden die Inhalte, die sich im Postfach des Akteurs befinden, verarbeitet oder verworfen. Bei der Stop-Methode wird nur die Nachricht, die gerade verarbeitet wird, beendet. Bei der PoisonPill-Methode werden alle Nachrichten verarbeitet, die sich derzeit im Posteingang befinden. Die Methode graceful-stop wartet darauf, dass der Akteur nach einer bestimmten Zeitspanne beendet wird. Sie kann nützlich sein, wenn Akteure in einer bestimmten Reihenfolge gestoppt werden sollen. Nicht verarbeitete Nachrichten von Akteuren, die beendet wurden, werden in den sogenannten "toten Briefkasten" übertragen.
Der Akteur selbst verfügt über die folgenden Lebenszyklusmethoden:
spark master
spark worker
spark history server
zookeeper
3 Kafka brokers
schema-registry
confluent control center
In Akka Typed hingegen hält der Akteur sich selbst an, indem er Behaviors.stopped zurückgibt. Außerdem enthält Typed nicht die Signale preStart und postRestart. Alles, was in diesen Phasen zu tun ist, kann in Behaviors.setup oder im Konstruktor der Klasse AbstractBehavior erledigt werden, wenn der objektorientierte Stil verwendet wird.
1.4. Interaktionsmuster
Wenn Akteure miteinander kommunizieren, gibt es ein paar vordefinierte Muster für ihre Kommunikation. Zwei der bekanntesten Muster sind "Fire and Forget" und "Request and Response". Fire and Forget bedeutet einfach, dass eine Nachricht an den Akteur gesendet wird und die Verarbeitung einfach weitergeht. Der Absender wartet nicht auf eine Antwort und hat auch keine Möglichkeit zu überprüfen, ob die Nachricht angekommen ist oder verarbeitet wurde. Request and Response hingegen bedeutet, dass der Absender eine Antwort vom Empfänger erwartet und wartet, bis er sie erhält.
Quelle: https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html
Obwohl es einige häufig verwendete Muster gibt, kann die Interaktion zwischen Akteuren beliebig komplex werden. In der Syntax von Akka Classic wird für Nachrichten, die auf fire and forget folgen, der Operator "tell" verwendet, der wie folgt bezeichnet wird: <Empfänger> ! <Nachricht>. Der "ask"-Operator kann verwendet werden, um einen Future zu erstellen, der eine potenzielle Antwort umschließt. Die Syntax sieht so aus: <Empfänger> ? <Nachricht>. Für Akka Typed ist ein ähnlicher ask-Operator auch für die Verwendung außerhalb eines Akteurs verfügbar. Für die Verwendung innerhalb von Akteuren ist die ask-Methode des ActorContext zu verwenden.
1.5. Threads & Blockierungscode
In Akka teilen sich die Akteure standardmäßig einen Thread-Pool, der vom ForkJoinExecutor ausgeführt wird. Der ForkJoinExecutor hat keine feste Poolgröße. Stattdessen erzeugt er neue Threads, wenn neue Aufgaben erstellt werden. Während dies für den meisten Code sehr gut funktioniert, kann es problematisch werden, wenn er auf blockierenden Code trifft, insbesondere wenn dieser blockierende Code sehr oft ausgeführt werden muss. Für jede der blockierenden Aufgaben legt der ForkJoinExecutor einen neuen Thread an, um sie auszuführen. Dies kann dazu führen, dass die blockierende Aufgabe den Thread-Pool in Beschlag nimmt und den anderen Aufgaben die Ausführung verwehrt, was als Thread-Starvation bezeichnet wird. Um diese Situation zu vermeiden, ist es ratsam, laufende Aufgaben an Akteure zu delegieren, die auf einem Thread-Pool laufen, der von einem ThreadPoolExecutor verwaltet wird. Dieser Thread-Pool hat eine feste Größe und stellt sicher, dass die blockierenden Aufgaben die nicht-blockierenden Aufgaben nicht aushungern.
2. Verwendung von Akka zur Simulation eines Produzenten
2.1. Vorbereiten der Infrastruktur
Bevor wir mit der Implementierung beginnen, müssen wir zunächst die dafür erforderliche Infrastruktur einrichten. In unserem Repository haben wir die Docker-Dateien und Skripte für den Anfang bereitgestellt. Natürlich können Sie auch Ihre eigenen Spark- und Kafka-Instanzen bereitstellen. Wir verwenden die folgenden Container:
Funkenmeister
Funkenarbeiter
spark history server
Zoowärter
3 Kafka-Broker
schema-registry
confluent Kontrollzentrum
Die Besonderheiten von Kafka werden in unserem nächsten Artikel besprochen. Für den Moment reicht es, die Container zum Laufen zu bringen.
2.2. Über Staaten nachdenken
Bevor wir uns mit den Details der Implementierung befassen, lassen Sie uns einen Blick auf den Geschäftsfall werfen, den wir simulieren wollen. Stellen wir uns vor, wir sind ein Unternehmen, das eine Flotte von Autos besitzt, die von unseren Kunden gemietet werden können. Diese Autos haben natürlich eine Fahrzeugidentifikationsnummer (vin), mit der sie identifiziert werden können. Sie haben auch eine Position, die gleich bleibt, wenn sie gerade verfügbar sind, und sich ändert, wenn sie gerade gebucht sind. Wenn das Fahrzeug derzeit gebucht ist, bleibt es für eine gewisse Zeit in dieser Position und ist dann wieder verfügbar.
Wie wir von unseren Smartphones wissen, kann es manchmal vorkommen, dass wir kein Signal empfangen können. Das bedeutet, dass das Unternehmen, das die Autos vermietet, keine Informationen über den Zustand der Autos erhält. Wir betrachten ein Auto, das sich in diesem Zustand befindet, als blockiert. Dieses Konzept führt uns zu dem folgenden Zustandsdiagramm.
2.3. Die Implementierung der Akteure
In unserem Beispiel verwenden wir Akka, um das Netzwerk von Autos zu simulieren, die gebucht, gesperrt oder verfügbar sein können. Zu diesem Zweck haben wir einen Master-Actor und einen Worker-Actor implementiert. Der Master Actor läuft auf einer Uhr und entscheidet bei jeder Iteration, welche Änderungen an den Autos vorgenommen werden sollen. Er unterscheidet also zwischen Autos, die gerade in Gebrauch sind, und Autos, die für Buchungen verfügbar sind. Für jedes Auto wird der nächste Status bestimmt und eine entsprechende Nachricht an den Worker-Actor gesendet. Der Worker Actor hat dann die Aufgabe, die Zustände der Autos in ein Kafka-Thema zu übertragen. Der Master Actor wartet eine bestimmte Zeit lang auf Antworten des Worker Actor und schläft dann eine Weile, um eine realistischere Simulation zu ermöglichen. Die folgende Grafik veranschaulicht die Interaktion zwischen den Akteuren.
Wenn wir die Akteure schreiben, müssen wir uns zunächst Gedanken über die Art der Nachrichten machen, die sie erhalten sollen. Im Fall unseres Master-Akteurs besteht sein Hauptzweck darin, in regelmäßigen Abständen Aktionen für die Fahrzeugflotte auszuführen. Wir nennen diese Art von Nachricht Clock. Außerdem werden wir eine Start- und eine Stop-Nachricht haben, um die Uhr zu starten und anzuhalten.
1case object Start2case object Clock3case object Done
Außerdem wird der Master-Akteur der Container für den Zustand der Flotte sein, der in einer Liste von Autoereignissen gespeichert wird. Bei jeder Iteration (einem Ticken der Uhr) iteriert der Master-Akteur über die Liste der Autos und entscheidet auf der Grundlage einiger vordefinierter Regeln und Zufallsgeneratoren über Aktionen. Die auszuführenden Aktionen werden als Nachrichten an den Worker-Akteur gesendet, der sich um die Mutation der Objekte, die Veröffentlichung des Ereignisses im Kafka-Topic und die Rückgabe einer Antwort an den Master kümmert.
1case Clock => {2 clock = clock + 13 log.info("start looping, clock:" + clock)4 val bookeds: List[Future[WorkerActor.Response]] = store5 .filter(_.state == "booked")6 .map(x => {7 val blk = random.nextInt(11);8 if (x.block > 0)9 worker.ask(WorkerActor.Notify(x)).mapTo[WorkerActor.Response]10 else if (blk == 10)11 worker.ask(WorkerActor.Block(x)).mapTo[WorkerActor.Response]12 else13 worker.ask(WorkerActor.Move(x)).mapTo[WorkerActor.Response]14 })15 log.info("$$$ all bookeds sent:" + clock)16 val availables: List[Future[WorkerActor.Response]] = store17 .filter(_.state == "available")18 .map(x => {19 val blk = random.nextInt(11)20 val bk = random.nextInt(5)21 if (x.block > 1)22 worker.ask(WorkerActor.Notify(x)).mapTo[WorkerActor.Response]23 else if (blk == 10)24 worker.ask(WorkerActor.Block(x)).mapTo[WorkerActor.Response]25 else if (bk == 4)26 worker.ask(WorkerActor.Book(x)).mapTo[WorkerActor.Response]27 else28 worker.ask(WorkerActor.Notify(x)).mapTo[WorkerActor.Response]29 })
Der Worker Actor muss in der Lage sein, alle Arten von Nachrichten auszuführen, die wir an ihn senden. Er sollte die entsprechende Methode unseres Car Event Singleton aufrufen, um den Zustand des Car Event Object zu ändern. Dann veröffentlicht er die Daten im Kafka-Topic und antwortet dem Absender mit dem Autoereignis, verpackt in ein Antwortobjekt. Diese beiden Aufgaben werden in einer kleinen Hilfsmethode ausgeführt. Das eigentliche Senden wird auf ein Kafka Client-Objekt abstrahiert, das die entsprechenden Konfigurationsobjekte enthält.
1def receive: Receive = {2 case Move(e) => {3 implicit val event: CarEvent = e4 if (event.expire > 0) {5 val loc = geoStore(random.nextInt(geolocs))6 log.info("### Clock: " + event.clock + ", moved vin:" + event.vin + " to:" + loc)7 send(move(loc), topic)8 } else {9 log.info("### Clock: " + event.clock + ", available vin:" + event.vin)10 send(transition("available", 0), topic)11 }12 }13 case Book(event) => {14 log.info("### Clock: " + event.clock + ", booked vin:" + event.vin)15 send(transition("booked", random.nextInt(50))(event), topic)16 log.info("*** Clock: " + event.clock + ", booked vin:" + event.vin)17 }18 case Notify(e) => {19 implicit val event: CarEvent = e20 if (event.block > 0) {21 sender ! Response(block(event.block - 1))22 } else {23 send(clock, topic)24 }25 }26 case Block(event) => {27 sender ! Response(block(random.nextInt(5))(event))28 }2930 }3132 def send(event: CarEvent, topic: String)(implicit producer: KafkaProducer[String, VehicleData]): Unit = {33 KafkaClient.send(event, topic, producer)34 sender ! Response(event)35 log.info("### Clock: " + event.clock + ", RESPONSE:" + event.vin)36 }
Wenn wir die Simulation ausführen, können wir die Verarbeitung der Akteure anhand der Protokolle, die sie ausgeben, verfolgen.
1simulator | [INFO] [10/16/2020 08:58:17.488] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:12simulator | [INFO] [10/16/2020 08:58:17.489] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 203simulator | [INFO] [10/16/2020 08:58:21.490] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:24simulator | [INFO] [10/16/2020 08:58:21.514] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:25simulator | [INFO] [10/16/2020 08:58:21.514] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 206simulator | [INFO] [10/16/2020 08:58:25.515] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:37simulator | [INFO] [10/16/2020 08:58:25.554] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:38simulator | [INFO] [10/16/2020 08:58:25.554] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 209simulator | [INFO] [10/16/2020 08:58:29.555] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:410simulator | [INFO] [10/16/2020 08:58:29.635] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:411simulator | [INFO] [10/16/2020 08:58:29.635] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 2012simulator | [INFO] [10/16/2020 08:58:33.635] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:513simulator | [INFO] [10/16/2020 08:58:33.665] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:514simulator | [INFO] [10/16/2020 08:58:33.668] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 2015simulator | [INFO] [10/16/2020 08:58:37.668] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:616simulator | [INFO] [10/16/2020 08:58:37.719] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:617simulator | [INFO] [10/16/2020 08:58:37.719] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20
Einpacken
In diesem Tutorial haben wir Ihnen gezeigt, wie Sie Akka nutzen können, um Ereignisse in einem Netzwerk zu simulieren. Wir haben eine Master-Worker-Architektur verwendet, um unseren koordinierenden Akteur frei zu halten, und den Worker-Akteur benutzt, um die Ereignisse an einen Kafka-Broker zu veröffentlichen.
Im nächsten Lernprogramm werden wir uns mit den Besonderheiten von Kafka als Message Broker beschäftigen.
Data Science
Development von Streaming-Anwendungen - Spark Structured Streaming - 3/3
Development von Streaming-Anwendungen - Kafka - 2/3
Abonnieren Sie jetzt die frischen Inhalte