Introduction

Welcome to our series on the development of streaming applications.

In the last decade, numerous new applications have been developed that had the need for processing data in a stream instead of batches. The reason is simple. The data always stays current this way. As a consequence, many technologies have been developed that deal with performing streaming operations in a distributed and concurrent fashion.

This series will focus on teaching you some of these technologies, using a well-known product from the real world, renting cars. The series will be split into the following tutorials:

  • Simulating a network of cars being booked and moved using Akka classic

  • Using a message broker (Kafka) to transport event data to a message consumer

  • Analyzing the data by leveraging Spark Structured Streaming to performing analytics

For the code referenced in this article, please check out our Github.

1. AKKA

Akka is a toolset for building concurrent and distributed message-driven applications. Essentially, Akka enables easy implementation of the actor pattern in applications. Everything in Akka is built around actors.

Disclaimer: Please be aware that we are using Akka classic in this tutorial. When developing new applications, it is recommended to use the newer version called Akka typed, which provides type safety for the messages being sent. Check out this link for more information on the differences.

1.1. What are the actors?

Actors are essentially stateful machines that encapsulate both state and behavior. The state and behavior of actors can only be changed from inside the actors. All interaction between actors happens in the form of immutable messages. To send a message to another actor, the actors will use actor reference objects. The messages that get sent to an actor arrive in the inbox of this actor. When an actor receives a message, it will perform subsequent computations and potentially answer back to the sender of the message.

In Akka Typed, Actors will usually handle different message types, that all directly or indirectly extend a common trait. Actors can not only change their state but also their behavior. In Akka Classic, this happens by using the become() method. In Akka Typed, Actors will return a Behavior at the end of their receive method. In cases where the behavior does not change, the function will simply return Behaviors.same.

1.2. How are actors organized?

Actors are organized in hierarchies, which are called actor systems in Akka. Each actor has a parent (except for the root actor) and each actor can have multiple children, essentially forming a tree structure.

Inside of the hierarchy, it should always be the target to delegate work top-down, so that the threads of coordinating actors are free of blocking operations.

The actor system in Akka is also used for actor discovery. Both absolute and relative paths can be used to get the reference of an actor by its path in the hierarchy.

Actor HierarchyActor Hierarchy

Source: https://doc.akka.io/docs/akka/current/general/addressing.html

1.3. Lifecycle Management

Actors can fail, and it is important to think about how to handle their failure. Fortunately, Akka already provides some tools to control the behavior in cases of failure. In Akka Classic, the actor will be restarted by default. When an exception occurs inside of an actor, it suspends itself and all of its children and sends a message to its parent (supervisor). The supervisor can then decide how to handle the failure, especially what to do with the state of the actor that failed. In Akka Typed, on the other hand, the actor will be stopped if an exception occurs. Also, SupervisorStrategy.Escalate is not supported in Typed.

In Akka Classic, actors can also be stopped actively by using one of the following ways:

  • Using the stop method

  • Sending a PoisonPill message

  • Using the graceful-stop method

Depending on the chosen method, contents that are inside of the actor's mailbox will be processed or dropped. When using the stop method, only the message that is currently being processed will be finished. Using the PoisonPill message, all of the messages currently in the inbox will be processed. The graceful-stop method will wait for the actor to terminate in a specified period of time. It can be useful when actors should be stopped in a specific order. Unprocessed messages of actors that have been terminated are transferred to the so-called "dead letter mailbox".

The actor itself has the following lifecycle methods:

  • preStart

  • postStop

  • preRestart

  • postRestart

In Akka Typed, on the other hand, the actor will stop itself by returning the Behaviors.stopped. Furthermore, Typed does not include the preStart and postRestart signals. Anything to perform in these stages can be done in Behaviors.setup or the constructor of the AbstractBehavior class, if the object-oriented style is used.

1.4. Interaction Patterns

When actors communicate with one another, there are a few predefined patterns of their communication. Two of the best-known patterns are "fire and forget" and "request and response". Fire and Forget simply means that a message gets sent to the actor and the processing simply continues. The sender does not wait for a response and also does not have any means to check whether or not the message has arrived or was processed. Request and Response on the other hand means that the sender expects a response from the recipient and will wait until he gets it.

Fire and ForgetFire and Forget

Source: https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html

Request and ResponseRequest and Response

Although there are some commonly used patterns, the interaction between actors can become arbitrarily complex. In the syntax of Akka Classic, messages that follow fire and forget will use the "tell" operator, denoted as follows: <recipient> ! <message>. The "ask" operator can be used to create a Future that wraps a potential response. The syntax looks like this: <recipient> ? <message>. For Akka Typed, a similar ask operator is still available for use outside of an actor. For the usage inside of actors, the ask method of the ActorContext shall be used.

1.5. Threads & Blocking Code

In Akka, actors share a thread pool by default, which is run by the ForkJoinExecutor. The ForkJoinExecutor does not have a fixed pool size. Instead, it will spawn new threads when new tasks get created. While this works very well for most code, it can become problematic when it encounters blocking code, especially when this blocking code has to be executed a lot of times. For each of the blocking tasks, the ForkJoinExecutor will spawn a new thread to execute it. This can then lead to the blocking task basically hijacking the thread pool and not allowing the other tasks to be executed, which is called thread starvation. To avoid this situation, it is advisable to delegate running tasks to actors that run on a thread pool managed by a ThreadPoolExecutor. This thread pool will be of fixed size and ensure, that the blocking tasks do not starve the non-blocking tasks.

2. Using Akka to Simulate a Producer

2.1. Preparing the Infrastructure

Before we get started with the implementation, we have to first set up the infrastructure needed for that. In our repository, we have provided the docker files and scripts to get started. Of course, you can also provide your own spark and Kafka instances. We are running the following containers:

  • spark master

  • spark worker

  • spark history server

  • zookeeper

  • 3 Kafka brokers

  • schema-registry

  • confluent control center

The specifics of Kafka will be discussed in our next article, for now, it is sufficient to get the containers running.

2.2. Thinking about States

Before we get into the details of the implementation, let us have a look at the business case that we want to simulate. Let us imagine we are a company that has a fleet of cars that can be rented by our customers. These cars will obviously have a vehicle identification number (vin) that can be used to identify them. They will also have a position, which will stay the same when they are currently available and change when they are currently booked. When the vehicle is currently booked, it will stay that way for some time and then change back to being available.

As we know from our smartphones, we might sometimes be confronted with not being able to receive a signal. This means, that the company who is renting out the cars will not get any update about the state of the cars. We will consider a car that is in this state to be blocked. This concept leads us to the following state diagram.

State DiagramState Diagram

2.3. Implementing the Actors

In our example, we are using Akka to simulate the network of cars, that can be booked, blocked, or available. To do so, we have implemented a Master Actor and a Worker Actor. The Master actor runs on a clock and decides every iteration of what changes should be made to the cars. It, therefore, distinguishes between cars that are currently in use and cars that are available for booking. For each of the cars, the next state is decided and an appropriate message is sent to the Worker Actor. It is then the job of the Worker Actor to push the states of the cars to a Kafka topic. The Master Actor will wait for answers from the Worker Actor for a specified time and then sleep a while to facilitate a more realistic simulation. The following graphic models the interaction between the actors.

Akka SimulatorAkka Simulator

When writing the actors, we will first have to think about the kind of messages that they will receive. In the case of our Master actor, the main purpose of it will be to perform actions on the fleet of cars at a regular time interval. We will call this type of message Clock. Further, we will have a Start and Stop message to start and stop the clock.

case object Start
case object Clock
case object Done

Further, the Master actor will be the container for the state of the fleet, which will be stored in a list of Car Events. Each iteration (a tick of the clock), the Master actor will iterate over the list of cars, deciding actions based on a few predefined rules and random generators. The actions to perform will be sent as messages to the Worker actor, which shall take care of mutating the objects, publishing the event to the Kafka Topic, and returning a response to the Master.


case Clock => {
      clock = clock + 1
      log.info("start looping, clock:" + clock)
      val bookeds: List[Future[WorkerActor.Response]] = store
        .filter(_.state == "booked")
        .map(x => {
          val blk = random.nextInt(11);
          if (x.block > 0)
            worker.ask(WorkerActor.Notify(x)).mapTo[WorkerActor.Response]
          else if (blk == 10)
            worker.ask(WorkerActor.Block(x)).mapTo[WorkerActor.Response]
          else
            worker.ask(WorkerActor.Move(x)).mapTo[WorkerActor.Response]
        })
      log.info("$$$ all bookeds sent:" + clock)
      val availables: List[Future[WorkerActor.Response]] = store
        .filter(_.state == "available")
        .map(x => {
          val blk = random.nextInt(11)
          val bk  = random.nextInt(5)
          if (x.block > 1)
            worker.ask(WorkerActor.Notify(x)).mapTo[WorkerActor.Response]
          else if (blk == 10)
            worker.ask(WorkerActor.Block(x)).mapTo[WorkerActor.Response]
          else if (bk == 4)
            worker.ask(WorkerActor.Book(x)).mapTo[WorkerActor.Response]
          else
            worker.ask(WorkerActor.Notify(x)).mapTo[WorkerActor.Response]
        })

The Worker Actor needs to be able to perform all the types of messages that we send to it. It should call the appropriate method of our Car Event Singleton to mutate the state of the Car Event Object. Then, it shall publish the data to the Kafka Topic and respond to the sender with the car event, wrapped in a response object. Both of these tasks will be performed in a small helper method. The actual sending will be abstracted to a Kafka Client object which will contain the relevant configuration objects.


def receive: Receive = {
    case Move(e) => {
      implicit val event: CarEvent = e
      if (event.expire > 0) {
        val loc = geoStore(random.nextInt(geolocs))
        log.info("### Clock: " + event.clock + ", moved vin:" + event.vin + " to:" + loc)
        send(move(loc), topic)
      } else {
        log.info("### Clock: " + event.clock + ", available vin:" + event.vin)
        send(transition("available", 0), topic)
      }
    }
    case Book(event) => {
      log.info("### Clock: " + event.clock + ", booked vin:" + event.vin)
      send(transition("booked", random.nextInt(50))(event), topic)
      log.info("*** Clock: " + event.clock + ", booked vin:" + event.vin)
    }
    case Notify(e) => {
      implicit val event: CarEvent = e
      if (event.block > 0) {
        sender ! Response(block(event.block - 1))
      } else {
        send(clock, topic)
      }
    }
    case Block(event) => {
      sender ! Response(block(random.nextInt(5))(event))
    }

  }

  def send(event: CarEvent, topic: String)(implicit producer: KafkaProducer[String, VehicleData]): Unit = {
    KafkaClient.send(event, topic, producer)
    sender ! Response(event)
    log.info("### Clock: " + event.clock + ", RESPONSE:" + event.vin)
  }

When we will be running the simulation, we will be able to see the actors processing by the logs they are printing.

simulator           | [INFO] [10/16/2020 08:58:17.488] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:1
simulator           | [INFO] [10/16/2020 08:58:17.489] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20
simulator           | [INFO] [10/16/2020 08:58:21.490] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:2
simulator           | [INFO] [10/16/2020 08:58:21.514] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:2
simulator           | [INFO] [10/16/2020 08:58:21.514] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20
simulator           | [INFO] [10/16/2020 08:58:25.515] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:3
simulator           | [INFO] [10/16/2020 08:58:25.554] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:3
simulator           | [INFO] [10/16/2020 08:58:25.554] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20
simulator           | [INFO] [10/16/2020 08:58:29.555] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:4
simulator           | [INFO] [10/16/2020 08:58:29.635] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:4
simulator           | [INFO] [10/16/2020 08:58:29.635] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20
simulator           | [INFO] [10/16/2020 08:58:33.635] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:5
simulator           | [INFO] [10/16/2020 08:58:33.665] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:5
simulator           | [INFO] [10/16/2020 08:58:33.668] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20
simulator           | [INFO] [10/16/2020 08:58:37.668] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] start looping, clock:6
simulator           | [INFO] [10/16/2020 08:58:37.719] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$ FINISHED BLOCKING:6
simulator           | [INFO] [10/16/2020 08:58:37.719] [Simulator-akka.actor.default-dispatcher-4] [akka://Simulator/user/master] $$$$$$$$$$ size of store at clock end: 20

Wrapping it Up

In this tutorial, we have shown you how you can leverage Akka to simulate events happening in a network. We have used a master-worker architecture to keep our coordinating actor free and used the worker actor to publish the events to a Kafka broker.

In the next tutorial, we will focus on the specifics of Kafka as a message broker.

For the next part of the tutorial please click here! For the Akka vs Vertx add-on click here!