Introduction

Welcome back to the second part of our tutorial series on developing streaming applications. In the first part, Developing Streaming Applications - Akka, we had a look at Akka and our use case of simulating rental cars. This part of the series will focus on Kafka.

Kafka has emerged as a technology to tackle the challenge of exchanging data from a multitude of source- and target systems.

Let us assume a use case where we have four source systems and four target systems, each needing to be integrated with one another.

This would mean that we would have to implement 16 integrations, with each one being concerned about:

  • What protocol to use (TCP, HTTP, REST, JDBC)

  • How to parse the data (Binary, CSV, JSON, Avro)

  • The data schema and its evolution (how the data is shaped)

For the code referenced in this article, please check out our Github.

complicated messaging systemcomplicated messaging system

So how do we solve this?

This is where Apache Kafka comes in. Kafka allows you to decouple the data streams from your systems. The source systems push their data into Kafka and the target systems pull their data from Kafka. Kafka itself will act as an intermediary (broker).

Messaging System with Kafka Messaging System with Kafka

Why Apache Kafka?

Kafka has been created by LinkedIn and is currently an open-source project maintained by Confluent. It comes with multiple advantages. Kafka can be deployed distributed. It has a resilient architecture, achieved through a master-slave concept. This means it is quite fault-tolerant. Kafka can be scaled horizontally up to hundreds of brokers dealing with millions of messages per second. The Latency of less than 10 ms can be achieved, which is considered to be real-time.

1. Topics, partitions, and offsets

Topics are a central concept of Kafka. Each topic represents a stream of data and can be regarded as similar to a table in a database. There is no limit to the number of topics. Each topic is identified by its name and split into partitions. The partitions themselves contain ordered sets of messages that have been pushed to their topic. Each message within a partition gets an incremental id, also known as offset. This offset is specific to the partition. The number of partitions has to be specified when creating a new topic.

Kafka Topics Partitions and OffsetsKafka Topics Partitions and Offsets

Although the order of messages is guaranteed inside of each partition, there is no such guarantee between partitions. When a producer (source system) pushes a message to a topic, the message will randomly be routed to one of the partitions, unless a key is specified. Once the message is written to the partition, the message will not be changed. After a period of time (one week by default), the message will be dropped from the partition.

2. Brokers

While a topic and its partitions divide messages logically, they also have to be stored somewhere physically. Partitions are located as simple log files on so-called brokers. A Kafka cluster is usually composed of multiple brokers (servers). It is also possible to have a cluster with only one broker, but it is not advisable. Each of the brokers is identified by the id and contains some topic partitions. It will hold some of the data but not all of it, since Kafka is a distributed system. Connecting to a bootstrap broker will connect the instance to the entire cluster. A good approach for starting can be to use three brokers.

2.1. Brokers and Topics

As the graphic shows, a topic is spread over multiple brokers. When creating a topic, Kafka will automatically assign the topic and distribute the partitions of the topic across the brokers using the round-robin partitioner. For Topic-A all partitions will be distributed across all brokers as the graphic shows. Topic-A has 3 partitions while Topic-B has 2 partitions

Partitions Distribution Across Kafka BrokersPartitions Distribution Across Kafka Brokers

2.2. Topic Replication Factor

Distributed systems will often rely on replication to be more fault-tolerant. This means that data from one machine is replicated to another machine. That way, if one instance fails for some reason, another instance can take over. Each topic in Kafka has its own replication factor, which will usually be between two and three. This means that there will be a leader and one or two replications for each partition of the topic. In the graphic below, you can see that topic A has two partitions and a replication factor of two.

Topic Replication Factor\Topic Replication Factor\

If the broker 102 is lost, the brokers 101 or 103 can take over and become the new leader for the partitions. There can never be more than one leader for each partition but there might be multiple ISRs (in-sync replicas). Data from a partition can only ever be read from the partition leader, so there always needs to be a leader. The other brokers will synchronize the data by sending fetch requests in a regular interval, which Kafka considers to be "in-sync". An alternative interpretation of in-sync can be configured with the acks parameter, which will be explained in the next section. A replica that is unresponsive for more than ten seconds will be considered out-of-sync and can not become the leader for the partition.

Partition Leaders and Replications Partition Leaders and Replications

3. Producers

Producers can write data on topics. Thanks to the partitioning of each topic, the load is balanced between brokers. To find a balance between durability guarantees and performance, producers can specify the acks parameter, which has three different options.

  1. Acks = 0: The producer will simply send the data to the broker and directly consider it as received.

  2. Acks = 1: The producer will send the data to the broker and wait for confirmation (ack) from the broker, that the data has arrived.

  3. Acks = all: The producer expects the broker to wait for acknowledgment from all the in-sync replicas, that they have copied the data. Only when the replicas have all given their acknowledgment, the leader broker will respond to the producer with an ack.

When a producer sends a message to a topic without specifying a key, the message will be distributed to a random partition.

Producers and Partitions Balance Across Brokers Producers and Partitions Balance Across Brokers

3.1. Producers: Message keys

In cases where the producer specifies a key, all messages containing the same key will go to the same partition, assuming that the number of partitions does not change. If the key is null, the data will be sent in a round-robin to all brokers (101 – 102 – 103). This behavior can be useful when message ordering is needed for a specific field.

4. Consumers

Consumers will read data from a specific topic. The data read for each partition is ordered, but can be read from different partitions in parallel. In the case of broker failures, consumers will continue receiving messages from the newly elected partition leader.

Reading Data into Consumers Reading Data into Consumers

4.1. Consumers Groups

Consumer groups bundle consumers together, which should process all messages of one or more topics. Within a consumer group, each consumer should read and process the messages from n partitions of that topic, depending on the total number of partitions and the number of consumers. The consumers will automatically use a GroupCoordinator and a ConsumerCoordinator to assign a consumer to a partition. A partition can only ever be read from one consumer for each consumer group, meaning that the number of consumers in a consumer group should never exceed the number of partitions of the topic. Otherwise, some consumers in the group will stay idle. A partition can anyways be read from multiple consumers.

Consumer Groups Consumer Groups

4.2. Consumer Offsets

Kafka will store the offsets at which consumer groups have been reading. The point in time where the consumer commits the offset is very crucial to the delivery semantics, as the next section explains. If a consumer dies, it can recover and continue where it left off thanks to the committed offsets.

Consumer OffsetsConsumer Offsets

4.3. Delivery semantics for consumers

Consumers can choose when to commit their offset, resulting in different versions of delivery semantics:

  • At-most-once

    Offsets are directly committed as soon as the message is received. If the processing of the message goes wrong, the message will be lost and not read again.

  • At-least-once

    Offsets are committed after a message has been processed. If the processing takes place and the consumer fails before the offset has been committed, the consumer will process the message again after recovering.

  • Exactly-once

    This can be achieved for Kafka => Kafka workflows using or Kafka Streams.

5. Kafka Broker Discovery

Every broker in Kafka is also called a bootstrap server, and each broker knows about all brokers, topics, and partitions (metadata) inside the cluster. This means that connecting to one broker effectively is the same as connecting to the entire cluster. This can also be seen in the following graphic.

Kafka Broker DiscoveryKafka Broker Discovery

6. Zookeeper

The task of a Zookeeper is to manage all brokers in a Kafka cluster. The Zookeeper will help in performing leader elections and send notifications to the cluster in case of changes (a new topic, broker died, etc.). A Kafka cluster does not work without the Zookeeper, so it needs to be started first. The Zookeeper operates on an odd number of servers with one leader and multiple read-only followers.

7. Our use case

After having a look at the theory behind Kafka, let us get back to the use case we started on in Part-1, Developing Streaming Applications - Akka, of this series. First, we have programmatically created a Kafka topic:


def createTopic(brokers: String, topicName: String): CreateTopicsResult = {
  val config = new Properties
  config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)

  val admin = AdminClient.create(config)

  val partitions  = 3
  val replication = 2.toShort

  val topic = new NewTopic(topicName, partitions, replication)
  admin.createTopics(asJavaCollection(List(topic)))
}

Then we have set up our Worker actor so that it can write messages to the created topic. The messages that get sent to the Kafka Topic are represented in the form of car events. Since these car events have to be sent via the network, we have to determine how we want to serialize them. In our case, we are using JSON to serialize the car events. The events that we are publishing to the Kafka Broker can then be used to visualize the location of the cars or perform analytics on the data.


def producer(brokers: String, schemaUrl: String): KafkaProducer[String, VehicleData] = {
  val kprops = new Properties
  kprops.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  kprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaJsonSerializer[VehicleData]].getName)
  kprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  kprops.put("schema.registry.url", schemaUrl)
  println(classOf[KafkaJsonSerializer[VehicleData]].getName)

  new KafkaProducer[String, VehicleData](kprops)
}

def send(event: CarEvent, topic: String, producer: KafkaProducer[String, VehicleData]): Future[RecordMetadata] = {
  val msg: VehicleData = CarEventConvertor.convert(event)
  val data = new ProducerRecord[String, VehicleData](
    topic,
    msg.vin,
    msg
  )
  producer.send(data)
}

Wrapping it Up

In this tutorial, we have explained the fundamentals of Apache Kafka, a distributed messaging system. We have described its architecture with a zookeeper, brokers, producers, and consumers. The central part of Kafka is partitioned with replicated topics where the messages are being sent to.

Lastly, we have followed the car rental example from part one, Developing Streaming Applications - Akka, and described the implementation of the Kafka part - how to programmatically create a topic and how to send messages to this topic.

For the next part of the tutorial please click here!