Project Metamorphosis: Unveiling the next-gen event streaming platformLearn More

Track Transportation Assets in Real Time with Apache Kafka and Kafka Streams

Apache Kafka® is a distributed commit log, commonly used as a multi-tenant data hub to connect diverse source systems and sink systems. Source systems can be systems or records, operational databases, or any producer application, like an MQTT application. Kafka is heavily used to transform ETL jobs from batch mode to near-real-time mode. Apache Kafka is increasingly becoming the de facto multi-tenant, distributed event streaming platform for all types of enterprises across all verticals, democratizing data for both internal and external users or applications of the data.

What problem are we solving today?

Tracking transportation assets is one of many industries where Kafka is making a huge difference. We will examine how Kafka, Kafka Streams, Elasticsearch, and a visualization tool such as Kibana can be used to track the movement of assets in real time, specifically for trams, buses, and high-speed electric trains used in the Helsinki Region Transport (HSL) system. Transportation assets act as edge IoT devices that publish their status, including position per second.

A producer application subscribes to an MQTT server, which produces geolocation information about the trams running in the Helsinki area. Data from the MQTT server is written into the Kafka topic. Data is enriched into a format that puts the geolocation data into a nested JSON structure, which makes it easy for Elasticsearch to consume and display on a dashboard in Kibana. This entire workflow can be accomplished with minimal effort on the development side.

Zooming out to see the big picture

Real-time asset tracking

Let’s dig deeper into each of the components involved, starting with the Java client that produces the data stream from the MQTT server.

The MQTT source produces data into the server. The Java client subscribes to it and writes data into a Kafka topic called vehicle-positions. The producer will expect the topic vehicle-positions to already be created, or it will create it for you; however, auto.topic.create should be set to true. The relevant data comes from the HSL, where vehicles publish their data to the MQTT server every second. Each MQTT message has two parts: the topic and the binary payload. For ease of use, the Java client is Dockerized in this example. Here is a sample command to use if the authentication mechanism is PLAIN:

docker container run --network=host --rm -e KAFKA_BROKERS= -e KAFKA_USERNAME= -e KAFKA_PASSWORD= -it proton69/java-producer:paramsV2

The sample command looks like the following:

docker container run --network=host --rm -e KAFKA_BROKERS=ip-10-9-8-13.us-west-2.compute.internal:9092 -e KAFKA_USERNAME=test -e KAFKA_PASSWORD=test123 -it proton69/java-producer:paramsV2

Once the data is in the Kafka topic, you can easily filter the data based on vehicle types. For example, one topic could filter data for buses, ferries, or just trams. A unit of scaling in Kafka is called a partition. If the current number of partitions is unable to handle the throughput, then you can alter the Kafka topic to increase the number of partitions, but keep in mind that consumers rely on data partitioned by a key. If you want to enhance the producer app, the source code can be found on GitHub.

The stream of per-second vehicle position data is written into the Kafka topic vehicle-positions. However, in order for this data to be consumed by a map widget into Kibana, messages need to be massaged and prepared beforehand. This is where Kafka Streams comes in very handy. Kafka Streams makes it easy to perform data enrichment that is simple, scalable, and if needed, exactly-once. For a more streamlined approach, you can use ksqlDB, which allows you to process events using SQL like language, consume data from a topic, enrich the data, and send it to downstream systems.

The simplest way to use the Kafka Streams app is shown below:

java -jar 

You can run this on a broker, as it needs access to the vehicle-positions topic:

java -jar vehicle-position-reformat-1.0-SNAPSHOT-jar-with-dependencies.jar

The Streams app helps us do this, JSON message in vehicle-positions looks like below:

...
 "route": "1009",
 "occu": 0,
 "lat": 60.177906,
 "long": 24.949954
 }

Using Kafka Streams we can enrich it, fairly easily. After enrichment, the data is written into the topic vehicle-positions-enriched, the JSON message looks like the following:

...
 "route": "1009",
 "occu": 0,
 "location": {
   "lat": 60.177906,
   "long": 24.949954
 }

Data is now in the format that we need, so let’s push the data into Elasticsearch using a sink connector. Setting up the sink connector for Elasticsearch is fairly straightforward. Prior to setting up the connector, a dynamic template needs to be set up on the Kibana side to recognize the geolocation data. The template below captures the data coming from the Kafka topic and maps the location.lat and location.long to geo_point in Kibana, which can be used to plot a real-time map dashboard.

{
  "_doc": {
    "dynamic_templates": [
      {
        "dates": {
          "mapping": {
            "format": "epoch_millis",
            "type": "date"
          },
          "match": "ts"
        }
      },
      {
        "locations": {
          "mapping": {
            "type": "geo_point"
          },
          "match": "*location"
        }
      }
    ]
  }
}

This is the configuration that was used in the setup:

curl -X PUT -H "Content-Type: application/json" --data '
{
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "type.name": "_doc",
  "topics": "vehicle-positions-enriched",
  "consumer.override.auto.offset.reset": "latest",
  "key.ignore": "true",
  "schema.ignore": "true",
  "name": "vehicle-positions-elastic",
  "value.converter.schemas.enable": "false",
  "connection.url": "http://ip-10-9-8-39.us-west-2.compute.internal:9200",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}' http://ip-10-9-8-13.us-west-2.compute.internal:8083/connectors/vehicle-positions-elastic/config

The connector status is shown below:

curl -X GET http://localhost:8083/connectors/vehicle-positions-elastic/status

It’s always good practice to add a dynamic template for the Elasticsearch index:

{
  "_doc": {
    "dynamic_templates": [
      {
        "dates": {
          "mapping": {
            "format": "epoch_millis",
            "type": "date"
          },
          "match": "ts"
        }
      },
      {
        "locations": {
          "mapping": {
            "type": "geo_point"
          },
          "match": "*location"
        }
      }
    ]
  }
}

Code for the above Streams application can be found on GitHub.

Data should start flowing from the Kafka topic to the Elasticsearch index. To visualize the data, we will use Kibana.

Go to Kibana > Management > Index Management, where you should see the vehicle-positions-enriched index.

Elastic | Index Management

Shortly after, the docs will start populating in the index, which can be seen in the “Docs count” column.

Kibana has a maps widget that is easy to use, and we can see the geolocation data by plotting a graph, which can later be used in the dashboard app of Kibana.

From the Kibana homepage, click App Maps > Create Map > Add Layer > {Select Data Source} > Documents > Index Pattern > Add Layer.

Name the “Layer Settings,” and in the “Sorting” section, select “Speed” (abbreviated as “spd”). Click Save and Close.

You should now have a real-time dashboard running on Kibana.

Elastic | Real-Time Dashboard

Conclusion

The entire data pipeline we’ve just built is real time. The producer produces data into a Kafka topic; the stream processing query runs continuously, processes the incoming JSON data, and enriches it in real time; then through the Kafka Connect framework, the Elasticsearch connector sinks the data into Elasticsearch, again in real time, and eventually makes its way to the Kibana map application for a real-time dashboard. As a dynamic platform, Kafka makes processing data in real time possible so that companies can be less reactive and more proactive about meeting the demands of tomorrow starting today.

To start building applications with Kafka Streams, download the Confluent Platform and get started with the leading distribution of Apache Kafka.

Ram Dhakne works as a solutions engineer at Confluent. He has a wide array of experience in NoSQL databases, filesystems, distributed systems, and Apache Kafka. He has supported industry verticals ranging from large financial services, retail, healthcare, telecom, and utilities companies. His current interests are in helping customers adopt event streaming using Kafka. As a part-time hobby, he has written two children’s books.

Did you like this blog post? Share it now

Subscribe to the Confluent blog

More Articles Like This

Analysing Changes with Debezium and Kafka Streams

Change Data Capture (CDC) is an excellent way to introduce streaming analytics into your existing database, and using Debezium enables you to send your change data through Apache Kafka®. Although […]

Improved Robustness and Usability of Exactly-Once Semantics in Apache Kafka

This blog post talks about the recent improvements on exactly-once semantics (EOS) to make it simpler to use and more resilient. EOS was first released in Apache Kafka® 0.11 and […]

Data Enrichment in ksqlDB Using UDTFs

This blog post applies to ksqlDB version 0.8.1 and later. Keeping a datacenter up and running is no walk in the park. It’s a job that involves mind-boggling amounts of […]

Sign Up Now

Start your 3-month trial. Get up to $200 off on each of your first 3 Confluent Cloud monthly bills

Nouvelles inscriptions uniquement.

En cliquant sur le bouton « inscription » ci-dessus, vous acceptez que nous traitions vos informations personnelles conformément à notre Politique de confidentialité.

En cliquant sur « Inscription » ci-dessus, vous acceptez les termes du/de la Conditions d'utilisation et de recevoir occasionnellement des e-mails publicitaires de la part de Confluent. Vous comprenez également que nous traiterons vos informations personnelles conformément à notre Politique de confidentialité.

Get Confluent Cloud

Get up to $200 off on each of your first 3 Confluent Cloud monthly bills


Choose one sign-up option below

Marketplaces

  • AWS
  • Azure
  • Google Cloud

  • Billed through your Cloud provider*
  • Stream only on 1 cloud
*Billing admin role needed

Marketplaces

  • Billed through your Cloud provider*
  • Stream only on 1 cloud
  • Billing admin role needed

*Billing admin role needed

Confluent


  • Pay with a credit card
  • Stream across multiple clouds

Confluent

  • Pay with a credit card
  • Stream across multiple clouds

En cliquant sur le bouton « inscription » ci-dessus, vous acceptez que nous traitions vos informations personnelles conformément à notre Politique de confidentialité.

En cliquant sur « Inscription » ci-dessus, vous acceptez les termes du/de la Conditions d'utilisation et de recevoir occasionnellement des e-mails publicitaires de la part de Confluent. Vous comprenez également que nous traiterons vos informations personnelles conformément à notre Politique de confidentialité.

Gratuit à vie sur un seul broker Kafka
i

Le logiciel permettra une utilisation illimitée dans le temps de fonctionnalités commerciales sur un seul broker Kafka. Après l'ajout d'un second broker, un compteur de 30 jours démarrera automatiquement sur les fonctionnalités commerciales. Celui-ci ne pourra pas être réinitialisé en revenant à un seul broker.

Sélectionnez un type de déploiement
Déploiement manuel
  • tar
  • zip
  • deb
  • rpm
  • docker
ou
Déploiement automatique
  • kubernetes
  • ansible

En cliquant sur le bouton « télécharger gratuitement » ci-dessus, vous acceptez que nous traitions vos informations personnelles conformément à notre Politique de confidentialité.

En cliquant sur « Téléchargement gratuit » ci-dessus, vous acceptez la Contrat de licence Confluent et de recevoir occasionnellement des e-mails publicitaires de la part de Confluent. Vous acceptez également que vos renseignements personnels soient traitées conformément à notre Politique de confidentialité.

Ce site Web utilise des cookies afin d'améliorer l'expérience utilisateur et analyser les performances et le trafic sur notre site Web. Nous partageons également des informations concernant votre utilisation de notre site avec nos partenaires publicitaires, analytiques et de réseaux sociaux.