Reactive Streams with Quarkus and Kafka

In this exercise, you will use the Quarkus Kafka extension to build a streaming application using MicroProfile Reactive Streams Messaging and Apache Kafka, a distributed streaming platform. You will also use Red Hat AMQ streams, which provides an easy way to run an Apache Kafka cluster on Kubernetes and OpenShift using Operators.

What is Apache Kafka?

Apache Kafka is a distributed streaming platform. A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

Kafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications

  • Building real-time streaming applications that transform or react to the streams of data

What is AMQ Streams?

AMQ Streams simplifies the process of running Apache Kafka in an OpenShift cluster. AMQ Streams provides container images and Operators for running Kafka on OpenShift. AMQ Streams Operators are fundamental to the running of AMQ Streams. The Operators provided with AMQ Streams are purpose-built with specialist operational knowledge to effectively manage Kafka.

Operators simplify the process of:

  • Deploying and running Kafka clusters

  • Deploying and running Kafka components

  • Configuring access to Kafka

  • Securing access to Kafka

  • Upgrading Kafka

  • Managing brokers

  • Creating and managing topics

  • Creating and managing users

The Goal

In this exercise, we are going to generate (random) names in one component. These names are written in a Kafka topic (names). A second component reads from the names Kafka topic and applies some magic conversion to the name (adding an honorific). The result is sent to an in-memory stream consumed by a JAX-RS resource. The data is sent to a browser using server-sent events and displayed in the browser. It will look like this:

names

Create Kafka Cluster

The Strimzi operator installs and manages Kafka clusters on Kubernetes. It’s been pre-installed for you, so all you have to do is create a Kafka custom resource inside your namespace.

Install Kafka and Kafka Topic CRs with:

oc create -f src/main/kubernetes/kafka.yaml

This will cause the Operator to provision a new Kafka cluster. It will take a few minutes to complete.

Back on the Topology View, make sure all the Kafka pods are up and running (with dark blue circles):

topiccreate

It may take a few minutes for all of the pods to appear spin up. You can continue to the next step while the Kafka cluster and topics are created.

Add Quarkus Kafka Extension

With Kafka installing, turn your attention back to the app. Like other exercises, we’ll need another extension to integrate with Kafka. Install it with:

mvn quarkus:add-extension -Dextensions="messaging-kafka"

You should see:

[INFO] [SUCCESS] ✅  Extension io.quarkus:quarkus-smallrye-reactive-messaging-kafka has been installed

This will add the necessary entries in your pom.xml to bring in the Kafka extension.

The Application You Will Build

The app consists of 3 components that pass messages via Kafka and an in-memory stream, then uses SSE to push messages to the browser. It looks like:

kafka

Create name generator

To start building the app, create a new Java class file in the org.acme.people.stream called NameGenerator.java. This class will generate random names and publish them to our Kafka topic for further processing. Use this code:

package org.acme.people.stream;

import jakarta.enterprise.context.ApplicationScoped;
import org.acme.people.utils.CuteNameGenerator;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

import java.time.Duration;

@ApplicationScoped
public class NameGenerator {

    @Outgoing("generated-name")        (1)
    public Multi<String> generate() {  (2)
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onOverflow().drop()
                .map(tick -> CuteNameGenerator.generate());
    }

}
1 Instruct Reactive Messaging to dispatch the items from returned stream to generated-name
2 The method returns Reactive Stream emitting a random name every 5 seconds

The method returns a Reactive Stream. The generated items are sent to the stream named generated-name. This stream is mapped to Kafka using the application.properties file that we will create soon.

Add honorifics

The name converter reads the names from Kafka, and transforms them, adding a random (English) honorific to the beginning of the name.

Create a new Java class file in the same package called NameConverter.java. Use this code:

package org.acme.people.stream;

import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.annotations.Broadcast;

@ApplicationScoped
public class NameConverter {

    private static final String[] honorifics = {"Mr.", "Mrs.", "Sir", "Madam", "Lord", "Lady", "Dr.", "Professor", "Vice-Chancellor", "Regent", "Provost", "Prefect"};

    @Incoming("names")               (1)
    @Outgoing("my-data-stream")      (2)
    @Broadcast                       (3)
    public String process(String name) {
        String honorific = honorifics[(int)Math.floor(Math.random() * honorifics.length)];
        return honorific + " " + name;
    }
}
1 Indicates that the method consumes the items from the names topic
2 Indicates that the objects returned by the method are sent to the my-data-stream stream
3 Indicates that the item are dispatched to all subscribers

The process method is called for every Kafka record from the names topic (configured in the application configuration). Every result is sent to the my-data-stream in-memory stream.

Expose to front end

Finally, let’s bind our stream to a JAX-RS resource. Create a new Java class in the same package called NameResource.java. Use this code:

package org.acme.people.stream;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.reactivestreams.Publisher;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

/**
 * A simple resource retrieving the in-memory "my-data-stream" and sending the items as server-sent events.
 */
@Path("/names")
public class NameResource {

    @Inject
    @Channel("my-data-stream") Publisher<String> names; (1)

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)(2)
    public Publisher<String> stream() { (3)
        return names;
    }
}
1 Injects the my-data-stream stream using the @Channel qualifier
2 Indicates that the content is sent using Server Sent Events
3 Returns the stream (Reactive Stream)

There is a pre-created names.html page for you to use (in the src/main/resources/META-INF/resources directory) which will make a request to this /names/stream endpoint using standard JavaScript running in the browser and draw the resulting names using the D3.js library. The JavaScript that makes this call looks like this (do not copy this into anything!):

var source = new EventSource("/names/stream"); (1)

source.onmessage = function (event) { (2)

    console.log("received new name: " + event.data);
    // process new name in event.data
    // ...

    // update the display with the new name
    update(); (3)
};
1 Uses your browser’s support for the EventSource API (part of the W3C SSE standard) to call the endpoint
2 Each time a message is received via SSE, react to it by running this function
3 Refresh the display using the D3.js library

Configure application

We need to configure the Kafka connector. This is done in the application.properties file (in the src/main/resources directory). The keys are structured as follows:

mp.messaging.[outgoing|incoming].{channel-name}.property=value

The channel-name segment must match the value set in the @Incoming and @Outgoing annotation:

  • generated-name → sink to which we write the names

  • names → source from which we read the names

Add the following values to the app’s src/main/resources/application.properties:

# Configure the Kafka sink (we write to it)
%prod.mp.messaging.outgoing.generated-name.bootstrap.servers=names-cluster-kafka-bootstrap:9092(1)
%prod.mp.messaging.outgoing.generated-name.connector=smallrye-kafka
%prod.mp.messaging.outgoing.generated-name.topic=names
%prod.mp.messaging.outgoing.generated-name.value.serializer=org.apache.kafka.common.serialization.StringSerializer

# Configure the Kafka source (we read from it)
%prod.mp.messaging.incoming.names.bootstrap.servers=names-cluster-kafka-bootstrap:9092(1)
%prod.mp.messaging.incoming.names.connector=smallrye-kafka
%prod.mp.messaging.incoming.names.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

%prod.quarkus.openshift.route.tls.insecure-edge-termination-policy=Redirect
%prod.quarkus.openshift.route.tls.termination=edge
1 The hostnames you see here will only make sense (be resolvable via DNS) when this app is run in the same Kubernetes namespace as the Kafka cluster you created earlier. So you’ll see this and other config values above prefixed with %prod which will not try to initialize Kafka when in dev mode.

More details about this configuration is available on the Producer configuration and Consumer configuration section from the Kafka documentation.

What about my-data-stream? This is an in-memory stream, not connected to a message broker.

Rebuild and redeploy app to OpenShift

Run the following command which will build and deploy using the OpenShift extension:

mvn clean package -DskipTests && \
oc label deployment/people app.kubernetes.io/part-of=people --overwrite && \
oc label dc/postgres-database app.kubernetes.io/part-of=people --overwrite && \
oc annotate deployment/people app.openshift.io/connects-to=postgres-database --overwrite && \
oc rollout status -w deployment/people

The build should take a minute or two to complete.

Test

Our application should be up and running in a few seconds after the build completes and generating names. To see if it’s working, access the graphical name cloud powered by Quarkus, MicroProfile and Kafka. You should see a cloud of names updating every 5 seconds (it may take a few seconds for it to start!):

It takes a few seconds to establish the connection to Kafka. If you don’t see new names generated every 5 seconds, reload the browser page to re-initialize the SSE stream.

names

These are the original names streamed through Kafka, altered to add a random honorific like Sir or Madam, and displayed in a word cloud for you to enjoy!

Congratulations!

This guide has shown how you can interact with Kafka using Quarkus. It utilizes MicroProfile Reactive Messaging to build data streaming applications.

If you want to go further check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.