NATS is a high-performance messaging system that offers simplicity, speed, and scalability. It is particularly suited for building distributed systems and microservices.
This article demonstrates how to integrate NATS with Java, showcasing the essential steps to set up, connect, and publish/subscribe to messages.
Prerequisites
Before diving in, we should ensure to have the following installed:
-
Java Development Kit (JDK) 11 or later.
-
Maven or Gradle for dependency management.
-
Docker (optional).
Setting Up the Java NATS Client
NATS provides a lightweight Java client library that we can easily include in our project.
Dependencies
For Maven, we need to add the following dependency to our pom.xml
:
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.20.4</version>
</dependency>
For Gradle, we need to include it in our build.gradle
:
implementation 'io.nats:jnats:2.20.4'
Running a NATS Server Locally
To run a NATS server locally, we can use Docker:
docker run -d --name nats -p 4222:4222 nats:latest
and in no time, our NATS server is up and running:
[1] 2024/11/25 19:39:42.340307 [INF] Starting nats-server
[1] 2024/11/25 19:39:42.340428 [INF] Version: 2.10.22
[1] 2024/11/25 19:39:42.340432 [INF] Git: [240e9a4]
[1] 2024/11/25 19:39:42.340436 [INF] Cluster: my_cluster
[1] 2024/11/25 19:39:42.340438 [INF] Name: NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47
[1] 2024/11/25 19:39:42.340445 [INF] ID: NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47
[1] 2024/11/25 19:39:42.340459 [INF] Using configuration file: nats-server.conf
[1] 2024/11/25 19:39:42.340985 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2024/11/25 19:39:42.341053 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2024/11/25 19:39:42.341197 [INF] Server is ready
[1] 2024/11/25 19:39:42.341219 [INF] Cluster name is my_cluster
[1] 2024/11/25 19:39:42.341306 [INF] Listening for route connections on 0.0.0.0:6222
Alternatively, we can download the NATS server from https://nats.io/download/ and run it:
nats-server
Connecting to NATS in Java
The NATS client makes connecting to a server straightforward. Below is an example of how to establish a connection:
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Nats;
public class ConnectExample {
public static void main(String[] args) {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
System.out.printf("Connected to NATS! %s %n", natsConnection.getServerInfo());
} catch (Exception e) {
e.printStackTrace();
}
}
}
This example produces the following output:
Connected to NATS! ServerInfo{serverId='NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47', serverName='NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47', version='2.10.22', go='go1.22.8', host='0.0.0.0', port=4222, headersSupported=true, authRequired=false, tlsRequired=false, tlsAvailable=false, maxPayload=1048576, connectURLs=[], protocolVersion=1, nonce=null, lameDuckMode=false, jetStream=false, clientId=6, clientIp='172.17.0.1', cluster='my_cluster'}
Publishing Messages
Once connected, publishing messages to a subject is easy. Here’s an example:
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Nats;
public class PublishExample {
public static void main(String[] args) {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
String subject = "tasks.greetings";
String message = "Hello, NATS! Visit www.hascode.com!";
natsConnection.publish(subject, message.getBytes());
System.out.printf("Message published to subject: %s and server %s", subject,
natsConnection.getServerInfo());
} catch (Exception e) {
e.printStackTrace();
}
}
}
More information about subject names and routing based on subjects can be found in the NATS Docs: Subject Based Messaging. |
Running the Publisher produces the following output:
Message published to subject: tasks.greetings and server ServerInfo{serverId='NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47', serverName='NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47', version='2.10.22', go='go1.22.8', host='0.0.0.0', port=4222, headersSupported=true, authRequired=false, tlsRequired=false, tlsAvailable=false, maxPayload=1048576, connectURLs=[], protocolVersion=1, nonce=null, lameDuckMode=false, jetStream=false, clientId=8, clientIp='172.17.0.1', cluster='my_cluster'}
Subscribing to Messages
Subscribing allows us to listen to messages on a specific subject, and we want to read our important, published greeting messages, don’t we?
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Nats;
public class PublishExample {
public static void main(String[] args) {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
String subject = "tasks.greetings";
String message = "Hello, NATS! Visit www.hascode.com!";
natsConnection.publish(subject, message.getBytes());
System.out.printf("Message published to subject: %s and server %s", subject,
natsConnection.getServerInfo());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Running the Subscriber and the Publisher together yields the following output:
Subscribed to subject: tasks.greetings
Received message: Hello, NATS! Visit www.hascode.com! via connection: ServerInfo{serverId='NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47', serverName='NBJECJLKJSP4LYCDE255CCSYZ46P5YUSEOL2OBGLXDJQQHXQZ24SNZ47', version='2.10.22', go='go1.22.8', host='0.0.0.0', port=4222, headersSupported=true, authRequired=false, tlsRequired=false, tlsAvailable=false, maxPayload=1048576, connectURLs=[], protocolVersion=1, nonce=null, lameDuckMode=false, jetStream=false, clientId=11, clientIp='172.17.0.1', cluster='my_cluster'}
Queues and Queue Groups
To parallelize work, multiple subscribers of a specific queue may join the same queue group.
At first look they seem to be similar to Kafka’s consumer groups so that each message is delivered to only one consumer per queue group.
First we need to publish some more messages, so here is our updated publisher:
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Nats;
public class PublishManyGreetingsExample {
public static void main(String[] args) {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
String subject = "tasks.greetings";
for (int i = 0; i < 100; i++) {
String message = "Hello, NATS! Visit www.hascode.com! #" + i;
natsConnection.publish(subject, message.getBytes());
System.out.printf("Message %d published to subject: %s and server %N", i, subject);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Message 0 published to subject: tasks.greetings and server
Message 1 published to subject: tasks.greetings and server
Message 2 published to subject: tasks.greetings and server
[..]
Message 99 published to subject: tasks.greetings and server
In the following example, four workers share a common queue group to parallelize handling the messages from the queue.
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
public class QueueSubscribersExample {
public static void main(String[] args) {
String subject = "tasks.greetings";
String queueGroup = "task-workers";
for (int i = 1; i <= 4; i++) {
int workerId = i;
new Thread(() -> {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
Dispatcher dispatcher = natsConnection.createDispatcher((msg) -> {
String received = new String(msg.getData());
System.out.println("Subscriber-" + workerId + " received: " + received);
});
dispatcher.subscribe(subject, queueGroup);
System.out.println(
"Subscriber-" + workerId + " subscribed to tasks in queue group: " + queueGroup);
// Keep the subscriber thread running
Thread.sleep(60000); // 1 minute
} catch (Exception e) {
e.printStackTrace();
}
}, "Subscriber-" + i).start();
}
}
}
Now running this code and the publisher produces the following outcome and as we can see, the work is distributed between the four workers:
Subscriber-2 subscribed to tasks in queue group: task-workers
Subscriber-4 subscribed to tasks in queue group: task-workers
Subscriber-1 subscribed to tasks in queue group: task-workers
Subscriber-3 subscribed to tasks in queue group: task-workers
Subscriber-3 received: Hello, NATS! Visit www.hascode.com! #0
Subscriber-1 received: Hello, NATS! Visit www.hascode.com! #3
Subscriber-3 received: Hello, NATS! Visit www.hascode.com! #1
Subscriber-4 received: Hello, NATS! Visit www.hascode.com! #2
Subscriber-4 received: Hello, NATS! Visit www.hascode.com! #4
Subscriber-4 received: Hello, NATS! Visit www.hascode.com! #5
Subscriber-1 received: Hello, NATS! Visit www.hascode.com! #6
Subscriber-2 received: Hello, NATS! Visit www.hascode.com! #7
Subscriber-2 received: Hello, NATS! Visit www.hascode.com! #8
Subscriber-2 received: Hello, NATS! Visit www.hascode.com! #9
Subscriber-1 received: Hello, NATS! Visit www.hascode.com! #10
Subscriber-1 received: Hello, NATS! Visit www.hascode.com! #11
Subscriber-1 received: Hello, NATS! Visit www.hascode.com! #12
[..]
Request / Reply Communication
The request/reply communication pattern is also possible with NATS, lets have a look at the following examples:
The following code requests a message for the subject questions.request
.
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Nats;
import java.time.Duration;
public class RequestExample {
public static void main(String[] args) {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
String subject = "questions.request";
String requestMessage = "What is the strangest tech blog?";
System.out.println("Sending request: " + requestMessage);
// Send the request and wait for the reply (timeout: 2 seconds)
Message reply = natsConnection.request(subject, requestMessage.getBytes(),
Duration.ofSeconds(2));
if (reply != null) {
String response = new String(reply.getData());
System.out.println("Received reply: " + response);
} else {
System.out.println("No reply received within timeout!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
This is our reply, using the replyTo
information from the message received, the response is addressed:
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
public class ReplyExample {
public static void main(String[] args) {
try (Connection natsConnection = Nats.connect("nats://localhost:4222")) {
String subject = "questions.request";
Dispatcher dispatcher = natsConnection.createDispatcher((msg) -> {
String request = new String(msg.getData());
System.out.println("Received request: " + request);
// Create a response message
String response = "Processed your request: %s, the strangest tech blog is www.hascode.com!".formatted(request);
System.out.println("Sending reply: " + response);
// Send the response to the reply subject
natsConnection.publish(msg.getReplyTo(), response.getBytes());
});
dispatcher.subscribe(subject);
System.out.println("Responder is listening on subject: " + subject);
// Keep the responder running
Thread.sleep(60000); // 1 minute
} catch (Exception e) {
e.printStackTrace();
}
}
}
When we’re now running both classes, we get the following output:
Sending request: What is the strangest tech blog?
Received reply: Processed your request: What is the strangest tech blog?, the strangest tech blog is www.hascode.com!
Responder is listening on subject: questions.request
Received request: What is the strangest tech blog?
Sending reply: Processed your request: What is the strangest tech blog?, the strangest tech blog is www.hascode.com!
Streaming with JetStream
JetStream is a feature of NATS that provides streaming capabilities such as message persistence, at-least-once delivery, message replay, and more. It is designed for use cases requiring durable messaging and advanced control over message streams.
A stream may store many related subjects and consumers may use different modes of operations and acknowledge modes.
Consumers can be push based so that messages are delivered to a specific subject or pull based so that clients may request batches of messages on demand.
Starting the Server with JetStream
To use JetStream, our NATS server must have JetStream enabled. By default, JetStream is disabled. We can enable it by starting the NATS server with the -js
flag:
nats-server -js
or using Docker
docker run -d --name nats-js -p 4222:4222 -p 8222:8222 nats:latest -js
Our server logs should already indicate, that JetStream is used now:
[1] 2024/11/28 13:10:47.350130 [INF] Starting nats-server
[1] 2024/11/28 13:10:47.353471 [INF] Version: 2.10.22
[1] 2024/11/28 13:10:47.353474 [INF] Git: [240e9a4]
[1] 2024/11/28 13:10:47.353476 [INF] Name: NBKCXV45K5AVLETBZMPDDZW2CS7JE5MCA23NBP4WSLZU2EXAUOYJBCAJ
[1] 2024/11/28 13:10:47.353483 [INF] Node: TcARzI1D
[1] 2024/11/28 13:10:47.353485 [INF] ID: NBKCXV45K5AVLETBZMPDDZW2CS7JE5MCA23NBP4WSLZU2EXAUOYJBCAJ
[1] 2024/11/28 13:10:47.354590 [INF] Starting JetStream
[1] 2024/11/28 13:10:47.354598 [WRN] Temporary storage directory used, data could be lost on system reboot
[1] 2024/11/28 13:10:47.354850 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __
[1] 2024/11/28 13:10:47.354854 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[1] 2024/11/28 13:10:47.354855 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[1] 2024/11/28 13:10:47.354857 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[1] 2024/11/28 13:10:47.354868 [INF]
[1] 2024/11/28 13:10:47.354869 [INF] https://docs.nats.io/jetstream
[1] 2024/11/28 13:10:47.354871 [INF]
[1] 2024/11/28 13:10:47.354872 [INF] ---------------- JETSTREAM ----------------
[1] 2024/11/28 13:10:47.354875 [INF] Max Memory: 23.39 GB
[1] 2024/11/28 13:10:47.354877 [INF] Max Storage: 14.08 GB
[1] 2024/11/28 13:10:47.354879 [INF] Store Directory: "/tmp/nats/jetstream"
[1] 2024/11/28 13:10:47.354880 [INF] -------------------------------------------
[1] 2024/11/28 13:10:47.355301 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2024/11/28 13:10:47.355449 [INF] Server is ready
We can verify that JetStream is running using the NATS CLI:
nats server report jetstream
Publishing and Reading Streams
The following example creates a new stream named example-stream
and configures the subject example-subject
for this stream.
Then a few messages are written to the stream and later on, we’re reading those messages from the stream:
package com.hascode.tutorial;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
public class StreamingExample {
public static void main(String[] args) {
// Connect to the NATS server
try (Connection nc = Nats.connect("nats://localhost:4222")) {
System.out.println("Connected to NATS server.");
// Access the JetStream context
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();
// Define and add a stream
String streamName = "example-stream";
String subject = "example-subject";
StreamConfiguration streamConfig = StreamConfiguration.builder()
.name(streamName)
.subjects(subject)
.storageType(StorageType.Memory)
.build();
// Create the stream if it doesn't exist
try {
jsm.addStream(streamConfig);
System.out.println("Stream created: " + streamName);
} catch (JetStreamApiException e) {
System.out.println("Stream already exists or another error occurred.");
}
// Publish a few messages to the stream
for (int i = 1; i <= 5; i++) {
String message = "Hello JetStream " + i;
js.publish(subject, message.getBytes());
System.out.println("Published message: " + message);
}
// Set up a consumer to read messages
JetStreamSubscription subscription = js.subscribe(subject);
System.out.println("Subscribed to subject: " + subject);
// Read messages
Message msg;
while ((msg = subscription.nextMessage(2000)) != null) {
System.out.println("Received message: " + new String(msg.getData()));
msg.ack(); // Acknowledge the message
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Running the example produces the following output:
Connected to NATS server.
Stream created: example-stream
Published message: Hello JetStream 1
Published message: Hello JetStream 2
Published message: Hello JetStream 3
Published message: Hello JetStream 4
Published message: Hello JetStream 5
Subscribed to subject: example-subject
Received message: Hello JetStream 1
Received message: Hello JetStream 2
Received message: Hello JetStream 3
Received message: Hello JetStream 4
Received message: Hello JetStream 5
Delivery Policies
A consumer may poll the stream using different delivery policies
By sequence number
Policy: DeliverPolicy.ByStartSequence
// ---- Poll from a Specific Sequence ----
System.out.println("\nPolling from sequence 5:");
ConsumerConfiguration seqConsumerConfig = ConsumerConfiguration.builder()
.ackPolicy(AckPolicy.Explicit)
.deliverPolicy(DeliverPolicy.ByStartSequence)
.startSequence(5)
.build();
JetStreamSubscription seqSubscription = js.subscribe(subject, PullSubscribeOptions.builder()
.configuration(seqConsumerConfig)
.build());
seqSubscription.pull(5); // Pull 5 messages
for (Message msg : seqSubscription.fetch(5, Duration.ofSeconds(2))) {
System.out.println("Received: " + new String(msg.getData()));
msg.ack();
}
This is our output:
Polling from sequence 5:
Received: Hello JetStream 5
Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
By time
Policy: DeliverPolicy.ByStartTime
// ---- Poll from a Specific Time ----
System.out.println("\nPolling messages after a specific time:");
ZonedDateTime startTime = ZonedDateTime.now().minusMinutes(1); // Messages from the last 1 minute
ConsumerConfiguration timeConsumerConfig = ConsumerConfiguration.builder()
.ackPolicy(AckPolicy.Explicit)
.deliverPolicy(DeliverPolicy.ByStartTime)
.startTime(startTime)
.build();
JetStreamSubscription timeSubscription = js.subscribe(subject, PullSubscribeOptions.builder()
.configuration(timeConsumerConfig)
.build());
timeSubscription.pull(5); // Pull 5 messages
for (Message msg : timeSubscription.fetch(5, Duration.ofSeconds(2))) {
System.out.println("Received: " + new String(msg.getData()));
msg.ack();
}
Polling messages after a specific time:
Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
Received: Message 5
Latest
Policy: DeliverPolicy.Last
// ---- Poll the Latest Messages ----
System.out.println("\nPolling the latest message:");
ConsumerConfiguration latestConsumerConfig = ConsumerConfiguration.builder()
.ackPolicy(AckPolicy.Explicit)
.deliverPolicy(DeliverPolicy.Last)
.build();
JetStreamSubscription latestSubscription = js.subscribe(subject, PullSubscribeOptions.builder()
.configuration(latestConsumerConfig)
.build());
latestSubscription.pull(3); // Pull the latest message
for (Message msg : latestSubscription.fetch(1, Duration.ofSeconds(2))) {
System.out.println("Received: " + new String(msg.getData()));
msg.ack();
}
Polling the latest message:
Received: Message 10
NATS CLI
The NATS CLI is a lightweight tool for interacting with a NATS server. It allows us to publish messages, subscribe to subjects, perform request/reply operations, and manage JetStream.
Installation
We can install the NATS CLI by downloading it from the official GitHub releases page: GitHub Releases.
For example, to install on Linux/macOS:
curl -LO https://github.com/nats-io/natscli/releases/download/v0.0.36/nats-0.0.36-linux-amd64
chmod +x nats-0.0.36-linux-amd64
sudo mv nats-0.0.36-linux-amd64 /usr/local/bin/nats
Verify the installation:
nats --version
Using the NATS CLI with Docker
If we prefer not to install the CLI locally, we can use the official NATS CLI Docker image. This is particularly useful in environments where direct installation isn’t possible.
docker run --rm -it natsio/nats-box nats --version
To interact with a running NATS server:
docker run --rm -it natsio/nats-box nats pub -s nats://localhost:4222 tasks "visit www.hascode.com"
The nats-box
image includes the CLI and other NATS tools, making it a convenient choice for testing.
Common NATS CLI Commands
Publishing a Message
Send a message to a subject:
nats pub <subject> <message>
Example:
nats pub tasks "visit www.hascode.com"
Subscribing to a Subject
Listen to messages on a subject:
nats sub <subject>
Example:
nats sub tasks
Request/Reply
Perform a request and wait for a response:
nats req <subject> <message>
Example:
nats req questions.request "what is the best tech blog?"
To reply to the request:
nats reply <subject> <response>
Checking Server Connection
Verify that the NATS server is reachable:
nats server ping
Example: Testing with NATS CLI
-
Start a subscriber:
nats sub tasks
-
Publish a message:
nats pub tasks "Task #1"
-
Observe the output on the subscriber terminal:
[#1] Received on "tasks": Task #1
The NATS CLI is an excellent tool for quickly testing, debugging, and managing our NATS setup, either locally or via Docker.
Useful NATS Snippets
Using Config Files
The following basic configuration activates JetStream and also adds an administration user:
jetstream {
store_dir: "/tmp/jetstream"
}
authorization {
users = [
{ user: "admin", password: "secret" }
]
}
Plaintext passwords should only be used in development. Starting a server instance yields a corresponding warning:
|
We may now start our server like this:
docker run -d host -v $(pwd)/nats.conf:/etc/nats.conf nats:latest -c /etc/nats.conf
Tutorial Sources
Please feel free to download the tutorial sources from my GitHub repository, fork it there or clone it using Git:
git clone https://github.com/hascode/nats-messaging.git
Resources
-
Official NATS Documentation: https://docs.nats.io
-
Java Client GitHub Repository: https://github.com/nats-io/nats.java
-
Maven Central: https://search.maven.org/artifact/io.nats/jnats