Apache Kafka, Prometheus, and Grafana in a Java Spring Boot application.

Gabriel Rios Belmiro
6 min readJun 26, 2023

--

In this article, we will explore some of the most commonly used technologies when we need high-performance message processing and service monitoring. These resources are widely used in microservices architecture.

Personal note: And to anyone who can follow me and leave a like, I would greatly appreciate it. Enjoy reading! =)

Apache Kafka

Apache Kafka, or simply Kafka, is a distributed data streaming platform commonly referred to as a messaging system. It is capable of publishing messages, storing and processing records in real-time. Kafka handles immense volumes of data where multiple clients can consume or publish messages on its topics.

Prometheus

Prometheus is a system for monitoring systems and services. It collects metrics from configured targets at defined intervals, evaluates rule expressions, displays the results, and can trigger alerts when specified conditions are observed.

Grafana

Grafana is an interactive open-source data visualization platform developed by Grafana Labs. It allows users to view data through unified tables and charts on a single or multiple dashboards, making interpretation and understanding easier.

Now, given the introductions, let’s see how we can combine all these technologies with our beloved Java using Spring Boot.

1 — Docker Environment Configuration

We will create a Docker environment that will set up containers for all these technologies. Below is the docker-compose.yml file.

version: '3'
networks:
gb-network:
driver: bridge

volumes:
prometheus_data:
grafana_data:

services:
zookeeper: #
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper/data:/data
- ./data/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1
prometheus:
image: prom/prometheus:latest
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- gb-network
ports:
- 9090:9090
depends_on:
- kafka1
grafana:
image: grafana/grafana:latest
ports:
- 3000:3000
networks:
- gb-network
depends_on:
- prometheus

o set up Kafka in Docker, we need two dependencies:

1 — ZooKeeper: Primarily used to maintain shared state and consistency in a distributed cluster of computers. It provides a set of high-level primitives such as locks, semaphores, queues, and directory trees that can be used by applications to perform coordination tasks.

2 — Kafkadrop: A lightweight and user-friendly UI for visualizing and interacting with Apache Kafka clusters. It provides a comprehensive view of topics, partitions, producers, and consumers in a Kafka cluster, allowing you to monitor and explore data in an intuitive way.

We also need to configure the export of Prometheus metrics through Docker. Inside the docker/config folder, create the prometheus.yml file where we will define the metric configurations and which resources we want to export. In this case, with the Actuator dependency, Spring is capable of generating metrics for server resources such as JVM, CPU, Memory, API Endpoints, Application Health, etc.

global:
scrape_interval: 5s

scrape_configs:
- job_name: "kafka_java"
metrics_path: "/actuator/prometheus"
static_configs:
- targets: ["host.docker.internal:8080"]
labels:
application: 'kafka_java'

Now let’s configure our Java service.

We need to configure the properties of our Spring project. I have chosen to use the YAML format. Below is the application.yml file:

pring:
kafka:
listener:
type: batch
bootstrap-servers: localhost:9091 # Endere�o do servidor Kafka
consumer:
group-id: meu-consumidor # ID do grupo do consumidor
max-poll-records: 500

#Configuração do prometheus para exportar as metricas do serviço
management:
endpoints:
enabled-by-default: false
web:
exposure:
include: 'health, prometheus'
endpoint:
health:
enable: true
show-details: always
metrics:
enable: true
prometheus:
enabled: true

Let’s now configure the Kafka settings in our service:

1 — Creating a new topic:

In this code snippet, we simply create a topic in Kafka for usage.

package com.gaber.kafkatest.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaConfig {

@Bean
public NewTopic topicoJava(){
return TopicBuilder.name("topico_java")
.build();
}
}

2 — Kafka Producer Configuration:

Here are the basic settings for producing messages to the created topic. You need to provide the Kafka server values and message serialization parameters.

package com.gaber.kafkatest.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String boostrapServers;

public Map<String, Object> producerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return props;
}
@Bean
public ProducerFactory<String, String> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(
ProducerFactory<String, String> producerFactory
){
return new KafkaTemplate<>(producerFactory);
}
}

3 — Kafka Consumer Configuration:

Following the same pattern as the producer, let’s configure the Kafka server and add the message deserialization parameters.

package com.gaber.kafkatest.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String boostrapServers;

public Map<String, Object> consumerConfig(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> factory (ConsumerFactory<String, String> consumerFactory){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}

}

In the consumer, I’m using batch message consumption, where it’s configured to return a list of 500 messages with the configuration parameter (MAX_POLL_RECORDS_CONFIG and setBatchListener = TRUE).

Here, we’ll configure both message production and the consumer. I chose to create message production when the application starts in the main method.

package com.gaber.kafkatest;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkatestApplication {

public static void main(String[] args) {
SpringApplication.run(KafkatestApplication.class, args);
}

@Bean
CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate){
return args -> {
for (int i = 0; i < 10000; i++) { #! change quantitny of loop!!!
kafkaTemplate.send("topico_java", "hello_kafka_n_" + i);
}

};
}
}

Our consumer is using multithreading to process the messages and send a POST request to the API to test the metrics on Grafana.

package com.gaber.kafkatest.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;
import org.springframework.kafka.annotation.KafkaListener;
import org.apache.commons.lang3.time.StopWatch;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

@Component
@Slf4j
public class KafkaListeners {

private final HttpClient httpClient;

public KafkaListeners() {
this.httpClient = HttpClient.newHttpClient();
}

@KafkaListener(
topics = "topico_java",
groupId = "groupId",
containerFactory="kafkaListenerContainerFactory"
)
public void listerner(List<String> dados){
if (dados.size() > 0){
Executor executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<Void>> futures = new ArrayList<>();
StopWatch processWatch = StopWatch.createStarted();
for(var dado : dados){
CompletableFuture<Void> future = CompletableFuture.runAsync(() ->{
log.info(String.valueOf(dado));
sendPostRequest("http://localhost:8080/api/kafka/" + dado);
}, executor);
futures.add(future);
}

futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

processWatch.stop();;

log.info("End of message consumption. : {} ", processWatch);
}

}
private void sendPostRequest(String url) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.POST(HttpRequest.BodyPublishers.noBody())
.build();

try {
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
log.info("Response code: {}", response.statusCode());
} catch (IOException | InterruptedException e) {
log.error("Failed to send POST request: {}", e.getMessage());
}
}

}

After starting the services, you can access the following resources:

Command to start the services in Docker:

docker-compose up -d 
  • Kafdrop: Access Kafdrop at http://localhost:9000 to visualize and explore Kafka topics and messages.
  • Prometheus: Access Prometheus at http://localhost:9090 to view the metrics collected from Kafka.
  • Grafana: Access Grafana at http://localhost:3000 and log in with the default credentials (username: admin, password: admin).

Configure a new dashboard to visualize the data and metrics collected by Prometheus. If you encounter any issues with the code, please access the repository with the complete structure and documentation: https://github.com/GaberRB/SpringKafkaMetrics

In this graph, we can see the number of requests to the endpoint and get an idea of Kafka’s processing performance along with multiprocessing and message batching.

--

--

Gabriel Rios Belmiro
Gabriel Rios Belmiro

Responses (1)