Introduction

Welcome to the third part of our series on the development of streaming applications.

In part one, Developing Streaming Applications - Akka, we introduced the concept of streaming, our car rental use case, and the Akka simulator that sets up the car park and simulates customers renting cars and driving with them around the city of Munich.

Part two, Developing Streaming Applications - Kafka, was focused on Kafka and explained how the simulator sends messages to a Kafka topic.

In this article, we will look at the basic concepts of Spark Structured Streaming and how it was used for analyzing the Kafka messages. Specifically, we created two applications, one calculates how many cars didn't send a message within a time window of 10 seconds, the second one clusters the locations of the cars that sent a message within the last 20 seconds into several different clusters.

For the code referenced in this article, please check out our github.

1. Spark Structured Streaming

Spark Structured StreamingSpark Structured Streaming

Source: https://www.manning.com/books/spark-in-action-second-edition

Spark is a well-known batch data processing tool and its structured streaming library (previously with Spark 1.x called discretized streaming - DStreams) enables to process streams of data with the same architecture and almost the same set of transformations.

The mechanism is fairly simple - Spark separates the stream of data into micro-batches (see the purple squares in the figure above) and those are then processed with usual Spark tools. It uses checkpointing and write-ahead logs and ensures thus end-to-end exactly-once semantics.

1.1. Hello World

Let's use the powerful concept of learning by example and have a look at some code. First, we need to set up a SparkSession. We are using spark-submit with this spark.conf:

spark.master                                                 spark://spark-master:7077
spark.serializer                                             org.apache.spark.serializer.KryoSerializer
spark.cores.max                                              2
spark.driver.memory                                          1g
spark.executor.memory                                        2g
spark.driver.cores                                           1
spark.executor.cores                                         2
spark.executor.extraJavaOptions                              -XX:+PrintGCDetails -Dlog4j.configuration=file:/spark_configs/log4j.properties
spark.driver.extraJavaOptions                                -Dlog4j.configuration=file:/spark_configs/log4j.properties
spark.eventLog.enabled                                       true
spark.eventLog.dir                                           /spark_logs

and then we can simply get the session as:


val spark = SparkSession
    .builder()
    .getOrCreate()

Then we connect to the topic on our Kafka brokers:


val BOOTSTRAP_SERVERS   = "kafka-1:29090,kafka-2:29090,kafka-3:29090"
val TOPIC_NAME          = "vehicle-data-topic"

val df: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("subscribe", TOPIC_NAME)
    .option("startingOffsets", "latest")
    .load()

Our use case uses Kafka but there are of course various other data formats such as files (text, csv, json, orc), sockets, or rates. Note that socket and rate sources should be used only for testing.

1.2. Basic Operations on Streaming DataFrames

Once we defined our input source, we can start doing various operations on the DataFrame/Dataset, many of them are the same as in Spark batch processing, but there are a few differences. For example operations withColumn, select or groupBy work as expected. Each row received by Spark from Kafka has these fields: key (binary), value (binary), topic (string), partition (int), offset (long), timestamp (timestamp), timestampType (int), headers (optional, array). As we are only interested in the VIN and the timestamp we select those two columns. The key also needs to be cast from a byte array into a string.


val selected: Dataset[(String, Timestamp)] = df
    .withColumn("vin", col("key").cast(StringType))
    .select("vin", "timestamp")
    .as[(String, Timestamp)]

Joins are supported both between two streaming DataFrames and between a static and a streaming DataFrame. When doing the left or right outer joins on two streaming DataFrames, it is necessary to specify watermarks (explained in the next section) and time constraints (e.g. JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR or JOIN ON leftTimeWindow = rightTimeWindow). For inner joins watermarks and time constraints are optional.

1.3. Window Operations on Event Time

Often we might be interested in working with windows of data, for example in our use case we want to analyze how many cars sent or didn't send a message in the last 10 seconds. We would also like to see non-overlapping windows, i.e. print out a new window every 10 seconds. This can be achieved with this groupBy:


val grouped = selected
    .withWatermark("timestamp", "2 seconds")
    .groupBy(
      window(col("timestamp"), "10 seconds", "10 seconds")
    )

By adding a watermark of 2 seconds, we make sure to wait for data that still got created within that 10-second window but only got delayed somewhere on the way. Watermarking can only be used with the append or update output modes (more on output modes in Section 1.5).

In the update mode, Spark is processing also partially finished windows. With our 10-second slide duration, every 10 seconds Spark would write all new rows into the sink (which didn't arrive late, i.e. later than their window upper bound plus the 2-second watermark). For example, at timestamp 14:00:00 it would write rows from windows 13:59:50-14:00:00 and 14:00:00-14:00:10, at timestamp 14:00:10 from windows 14:00:00-14:00:10 and 14:00:10-14:00:20 etc. On the other hand, in the append mode Spark would wait for all rows from the latest window (including the delay) and only then write the full window into the sink. So at timestamp 14:00:00, it would write only the window 13:59:40-13:59:50. For a more detailed explanation see the official documentation.

To continue with our example, the groupBy defined above continues with an aggregation and sorting. The countDistinct function is not supported in structured streaming, we use approx_count_distinct to count how many cars sent a message during the window.


val aggregated = grouped
    .agg(approx_count_distinct("vin").as("Number of alive cars"))
    .sort("window")

1.4. Unsupported Operations

There are also several unsupported operations in structured streaming:

  • Multiple streaming aggregations,

  • limit, take, and distinct functions,

  • sorting is supported only after aggregations or in the complete mode,

  • a few types of output joins (full outer for both combinations and right outer for stream-static DataFrames),

  • count- needs to be used only with groupBy, foreach - writeStream.foreach() should be used instead, show - replaced by the console sink (see).

1.5. Sinks

Similar to several input options described in Section 1.1 Spark also offers a handful of output options, called sinks:

  • file - only append mode possible, exactly-once semantics,

  • kafka - at-least-once semantics,

  • foreach or foreachBatch - explained in, also at-least-once semantics,

  • console - not fault-tolerant,

  • memory - only append and complete mode possible, not fault-tolerant.

In our demo example, we're using the console sink for monitoring the results.


val writer: DataStreamWriter[Row] = aggregated
    .writeStream
    .outputMode(OutputMode.Complete())
    .format("console")
    .option("truncate", value = false)
    .option("numRows", 1000)

We chose the complete mode so that we can see the results for all windows and compare them with each other. In this mode, the whole table is written to the sink, but Spark keeps an intermediate state and doesn't need to recalculate the aggregations from scratch.

Two other options are the append and output modes. As explained in Section 1.3 in the append mode only new rows will be appended to the result table. This then also implies that the queries cannot change the already added rows. That is why e.g. aggregations without a window on the event-time are not supported as the aggregates can change with new data. The update mode enables that, outputting all updated rows (i.e. in this case the updated aggregates).

1.6. Triggers

The third important option (after the sink type and output mode) for writing data is the trigger. This defines the timing for the processing. With the default settings, micro-batches are generated as soon as the previous micro-batch was processed. When the trigger is set as in our example below


val writerWithTrigger: DataStreamWriter[Row] = writer
    .trigger(Trigger.ProcessingTime("10 seconds"))

the micro-batches are kicked off at given intervals. When the processing of the previous micro-batch is not yet finished and the next micro-batch is already due, it only starts as soon as the previous one gets done. Furthermore, if there is no new data, no micro-batch gets kicked off.

1.7. Monitoring and Managing Queries

In this section, we will briefly talk about managing existing queries. Our example query is started as follows:


val query: StreamingQuery = writerWithTrigger.start()

This is a non-blocking call, so afterward we can run various commands on the query such as query.lastProgress() or query.status(). We can also get more information with query.id, .runId, .name, or .explain(). A running query can be stopped with query.stop(). It is also possible to check other queries with spark.streams.active, spark.streams.get(id). In our case, we will give the query ten minutes to run with query.awaitTermination(600000). In the case of several queries running the spark.streams.awaitAnyTermination(600000) might be useful which blocks until one of them terminates or ten minutes have passed.

1.8. Example Output

As there is a lot of randomness in the whole application, the output will look different with every run, but an example output might look as follows:


lost-connections    | -------------------------------------------
lost-connections    | Batch: 1
lost-connections    | -------------------------------------------
lost-connections    | +------------------------------------------+--------------------+
lost-connections    | |window                                    |Number of alive cars|
lost-connections    | +------------------------------------------+--------------------+
lost-connections    | |[2020-10-16 08:58:30, 2020-10-16 08:58:40]|14                  |
lost-connections    | |[2020-10-16 08:58:40, 2020-10-16 08:58:50]|17                  |
lost-connections    | |[2020-10-16 08:58:50, 2020-10-16 08:59:00]|14                  |
lost-connections    | |[2020-10-16 08:59:00, 2020-10-16 08:59:10]|20                  |
lost-connections    | +------------------------------------------+--------------------+

lost-connections    | -------------------------------------------
lost-connections    | Batch: 2
lost-connections    | -------------------------------------------
lost-connections    | +------------------------------------------+--------------------+
lost-connections    | |window                                    |Number of alive cars|
lost-connections    | +------------------------------------------+--------------------+
lost-connections    | |[2020-10-16 08:58:30, 2020-10-16 08:58:40]|14                  |
lost-connections    | |[2020-10-16 08:58:40, 2020-10-16 08:58:50]|17                  |
lost-connections    | |[2020-10-16 08:58:50, 2020-10-16 08:59:00]|14                  |
lost-connections    | |[2020-10-16 08:59:00, 2020-10-16 08:59:10]|20                  |
lost-connections    | |[2020-10-16 08:59:10, 2020-10-16 08:59:20]|12                  |
lost-connections    | |[2020-10-16 08:59:20, 2020-10-16 08:59:30]|18                  |
lost-connections    | +------------------------------------------+--------------------+

2. Clustering

The goal of our second Spark Streaming application is to cluster all locations that cars sent a message from into five clusters, to see in what parts of the city the cars were mostly located (either parked - state available or driving - state booked).

Here we need some more information than only the Kafka key (the VIN) - we need latitudes and longitudes. For this, we need to deserialize the value.

2.1. Serialization

There are several options for sending information through Kafka. As explained above, for our second use case we need to send more complicated objects than string messages. To do that, let's then have a look at avro and json formats.

2.1.1. Avro

According to the official Spark Avro documentation, after importing the org.apache.spark:spark-avro package it should be possible to receive Avro objects from a Kafka topic and deserialize them with the from_avro function. Copied from their example:


val avroSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val parsedFromAvro = df.select(from_avro(col(value), avroSchema) as "user")

The avro schema is either read from an avro file (as above) or obtained from the Schema Registry as shown here:


import io.confluent.kafka.schemaregistry.client.rest.RestService

val restService = new RestService(SCHEMA_REGISTRY_URL)
val valueRestResponseSchema = restService.getLatestVersion(TOPIC_NAME + "-value")
val avroSchema = valueRestResponseSchema.getSchema

This kafka avro serializer dependency is needed:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>${confluent.version}</version>
</dependency>

However, the Confluent Avro and Spark Avro wire formats (the result of a serializer or the source of a deserializer) are not compatible - the Confluent Avro contains an extra magic byte (1). It is therefore not easily possible to produce Avro messages with a Kafka producer and consume them with Spark Structured Streaming (2). It might be thus a better choice to use the JSON format instead.

2.1.2. JSON

To parse data from json we need to define the Spark schema of our object.


val sparkSchema = StructType(
    Seq(
      StructField("timeStamp", DoubleType),
      StructField("vin", StringType),
      StructField("fleet", StringType),
      StructField(
        "geoData",
        StructType(Seq(StructField("latitude", DoubleType), StructField("longitude", DoubleType)))
      ),
      StructField("carStateData", StructType(Seq(StructField("state", StringType))))
    )
  )

Afterward, we cast the value column into string and use the from_json function to get the vehicle object. As we then need only the latitude and longitude, we make one more select.


df
    .withColumn("value_string", col("value").cast(StringType))
    .withColumn("vehicle", from_json(col("value_string"), sparkSchema))
    .select(col("vehicle.geoData.latitude").as("latitude"), col("vehicle.geoData.longitude").as("longitude"))

2.2. KMeans

Spark ML unfortunately does not offer a big choice of ML algorithms. We picked the well-known KMeans for our example. First, we need to transform our two features into a vector column. Any invalid values we simply skip.


val numericFeatures: Array[String] = Array("latitude", "longitude")
val assembler: VectorAssembler = new VectorAssembler().setInputCols(numericFeatures).setHandleInvalid("skip").setOutputCol("features")

Second, we define the KMeans model and the ML pipeline.


val kmeans: KMeans = new KMeans().setK(5)
val pipeline: Pipeline = new Pipeline().setStages(Array(assembler, kmeans))

Lastly, we set up and start the query. We use a trigger of 20 seconds, and the append output mode as we are only interested in new messages.


val query: StreamingQuery = df.writeStream
    .outputMode(OutputMode.Append())
    .trigger(Trigger.ProcessingTime("20 seconds"))
    .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      if (!batchDF.isEmpty) {
        // Train the model
        val pipelineModel: PipelineModel = pipeline.fit(batchDF)
        // Make predictions
        val predictions: DataFrame = pipelineModel.transform(batchDF)

        // Evaluate clustering by computing Silhouette score
        val evaluator: ClusteringEvaluator = new ClusteringEvaluator()

        val silhouette: Double = evaluator.evaluate(predictions)
        logger.info(s"Silhouette with squared euclidean distance = $silhouette")

        // Show the result
        logger.info("Cluster Centers: ")
        pipelineModel.stages.last.asInstanceOf[KMeansModel].clusterCenters.foreach(logger.info)
      }
    }
    .start()

When doing some arbitrary operations during the streaming, as learning an ML model in our case, the foreach or foreachBatch functions can be used. Instead of writing the results of the transformations into a specified sink they give us much more freedom in what to do with all the incoming data. We choose foreachBatch where we can work with the whole batch of rows that arrived since the last trigger (ca. 20 seconds) as opposed to processing each row separately with the foreach function.

First, the model is trained on all locations sent by the cars in the last 20 seconds. Second, the trained model is used to make predictions, i.e. to assign each location into one of the five clusters. Third, we check how well the model performs based on the silhouette score. This is calculated based on the distance to other instances in the same cluster (should be as low as possible) and the distance to other instances in the next closest cluster (should be as high as possible). The score varies between -1 and +1, where +1 means that each instance is well inside its own cluster and far from other clusters. Based on the silhouette score we could also make a grid search of the optimal k value - the number of clusters:


.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  if (!batchDF.isEmpty) {
    val possibleKs: List[Int] = (2 to 10).toList
    val gridSearchResults: List[(Double, Array[linalg.Vector], Int)] = possibleKs.map(k => {
      val kmeans: KMeans = new KMeans().setK(k)
      val pipeline: Pipeline = new Pipeline().setStages(Array(assembler, kmeans))
      val pipelineModel: PipelineModel = pipeline.fit(batchDF)
      // Make predictions
      val predictions: DataFrame = pipelineModel.transform(batchDF)

      // Evaluate clustering by computing Silhouette score
      val evaluator: ClusteringEvaluator = new ClusteringEvaluator()

      val silhouette: Double = evaluator.evaluate(predictions)
      val clusterCenters: Array[linalg.Vector] = pipelineModel.stages.last.asInstanceOf[KMeansModel].clusterCenters

      (silhouette, clusterCenters, k)
    })

    val bestModel: (Double, Array[linalg.Vector], Int) = gridSearchResults.maxBy(_._1)
    logger.info(s"Silhouette with squared euclidean distance = ${bestModel._1}")

    // Show the result
    logger.info(s"Cluster Centers (k = ${bestModel._3}): ")
    bestModel._2.foreach(logger.info)
  }

2.3. Example Output

An example output of the clustering app might look as follows:


clustering          | 2020-10-16 08:59:36 INFO  Clustering$:81 - Silhouette with squared euclidean distance = 0.7429471554475906
clustering          | 2020-10-16 08:59:36 INFO  Clustering$:84 - Cluster Centers (k = 2): 
clustering          | 2020-10-16 08:59:36 INFO  Clustering$:85 - [48.164722352000005,11.459599766000002]
clustering          | 2020-10-16 08:59:36 INFO  Clustering$:85 - [48.10379802666666,11.656961003333333]

clustering          | 2020-10-16 09:00:17 INFO  Clustering$:81 - Silhouette with squared euclidean distance = 0.6236321112334502
clustering          | 2020-10-16 09:00:17 INFO  Clustering$:84 - Cluster Centers (k = 2): 
clustering          | 2020-10-16 09:00:17 INFO  Clustering$:85 - [48.12449423631579,11.66290214168421]
clustering          | 2020-10-16 09:00:17 INFO  Clustering$:85 - [48.15188781349055,11.49520759433962]

clustering          | 2020-10-16 09:00:50 INFO  Clustering$:81 - Silhouette with squared euclidean distance = 0.6038346941295536
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:84 - Cluster Centers (k = 8): 
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.16821079105262,11.733022212631578]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.16290015862069,11.457130701034485]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.140153734375,11.568322069375]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.087482496875005,11.622177886250002]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.043980579230784,11.539946313846157]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.05115284,11.699200665714287]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.2224593468421,11.54288648473684]
clustering          | 2020-10-16 09:00:50 INFO  Clustering$:85 - [48.1988281521739,11.641228679130434]

These cluster coordinates can then be easily displayed on a map, e.g. by rendering a simple HTML file.

Wrapping it Up

In this tutorial, we have shown how to use Spark Streaming to consume messages from Kafka. We have followed the car rental example from parts Developing Streaming Applications - Akka and Developing Streaming Applications - Kafka and implemented two applications analyzing the messages being sent to Kafka by the simulator.

One calculates how many cars did send a message within a 10-second window and how many of them lost connection. The second app uses a clustering algorithm to analyze from what locations the cars were sending the messages.

Sources

*1. Which is currently always 0.

*2. The function from_confluent_avro in this package seems to do the trick.