-
Notifications
You must be signed in to change notification settings - Fork 185
Description
Defect
We use the Java client library to create a pull-subscription and listen on it. When NATS server is restarted, the lib reconnects and even re-sends the subcriptions, but NATS won't send any messages to the client any more.
- Included
nats-server -DVoutput - Included a [Minimal, Complete, and Verifiable example] (https://stackoverflow.com/help/mcve)
nats-server -DV
I'm running docker.io/library/nats@sha256:58e483681983f87fdc738e980d582d4cc7218914c4f0056d36bab0c5acfc5a8b locally. Executing the above command in the running container gives me:
Starting nats-server
Version: 2.9.15
Git: [b91fa85]
Go build: go1.19.6
Name: NBDPZPHME36GTJH53Q3RBRJCQAKTAWASAKBCYIOAPRV5PYOBEQHP6UA7
ID: NBDPZPHME36GTJH53Q3RBRJCQAKTAWASAKBCYIOAPRV5PYOBEQHP6UA7
Created system account: "$SYS"
Error listening on port: 0.0.0.0:4222, "listen tcp 0.0.0.0:4222: bind: address already in use"
Versions of nats-server and affected client libraries used:
nats-server: 2.9.15
client: io.nats:jnats:2.16.9
OS/Container environment:
Podman 4.4.1 on Arch Linux.
Steps or code to reproduce the issue:
Starting with this basic listener:
package test.project;
import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.ErrorListenerLoggerImpl;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
public class NatsSubscriber {
private static final String STREAM_NAME = "stream-1";
public static void main(String[] args) throws InterruptedException {
new NatsSubscriber().run();
}
private void run() throws InterruptedException {
try (var connection = createConnection()) {
createStreamIfNotExists(connection);
var subscription = createSubscription(connection);
subscription.pull(10);
while (true) {
try {
System.out.println("listening...");
var message = subscription.nextMessage(Duration.ZERO);
if (message != null) {
System.out.println("received msg: "+ new String(message.getData()));
message.ack();
}
} catch (JetStreamStatusException e) {
// ignore "Unknown or unprocessed status message: Server Shutdown"
}
}
}
}
public Connection createConnection() {
var connectionOptions = new Options.Builder()
.server("nats://localhost:4222")
.connectionName("nats-subscriber-1")
.errorListener(new ErrorListenerLoggerImpl())
.noNoResponders()
.build();
try {
return Nats.connect(connectionOptions);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private JetStreamSubscription createSubscription(Connection connection) {
var subscriptionOptions = PullSubscribeOptions.builder()
.configuration(getConsumerOptions())
.stream(STREAM_NAME)
.build();
try {
return connection.jetStream().subscribe(STREAM_NAME, subscriptionOptions);
} catch (IOException | JetStreamApiException e) {
throw new RuntimeException(e);
}
}
ConsumerConfiguration getConsumerOptions() {
return ConsumerConfiguration.builder()
.deliverPolicy(DeliverPolicy.All)
.ackPolicy(AckPolicy.Explicit)
.durable("consumer-1")
.build();
}
private void createStreamIfNotExists(Connection connection) {
try {
var streamInfo = getStreamInfo(connection);
if (streamInfo.isEmpty()) {
createNewStream(connection);
}
} catch (IOException | JetStreamApiException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
private Optional<StreamInfo> getStreamInfo(Connection connection) throws IOException, JetStreamApiException {
try {
return Optional.of(connection.jetStreamManagement().getStreamInfo(STREAM_NAME));
} catch (JetStreamApiException ex) {
if (ex.getErrorCode() == 404) {
return Optional.empty();
}
throw ex;
}
}
private void createNewStream(Connection connection) throws JetStreamApiException, IOException {
var streamConfiguration = getStreamConfiguration();
connection.jetStreamManagement().addStream(streamConfiguration);
}
private StreamConfiguration getStreamConfiguration() {
return StreamConfiguration.builder()
.name(STREAM_NAME)
.storageType(StorageType.File)
.retentionPolicy(RetentionPolicy.WorkQueue)
.replicas(1)
.build();
}
}(full project: test-project.zip )
Start nats-server:
podman image pull docker.io/library/nats@sha256:58e483681983f87fdc738e980d582d4cc7218914c4f0056d36bab0c5acfc5a8b
podman run -p 4222:4222 -p 8222:8222 -p 6222:6222 --name nats-server -d docker.io/nats --jetstream --trace
(you can also simply replace podman by docker)
Follow the logs in the background:
podman logs -f <container-id>
Start the sample project above. App waits for messages after subscribing to stream:
listening...
Observe in nats-server output, that nats received the subscibe:
[TRC] 10.0.2.100:60886 - cid:60 - <<- [SUB _INBOX.KQyEsJiBiM22h3CgB48uTJ 2]
Check out the consumer info:
$ nats consumer info stream-1 consumer-1
State:
...
Waiting Pulls: 1 of maximum 512
(I removed irrelevant lines)
So there is a pull registered.
Publish a message:
nats publish stream-1 test
Observe in app output, that message was received:
received msg: test
Now restart nats-server:
podman container restart <container-id>
Again, observe in nats-server output that nats receives subscriptions after re-connect:
[TRC] 10.0.2.100:57696 - cid:41 - <<- [SUB _INBOX.KQyEsJiBiM22h3CgB48uTJ 2]
[TRC] 10.0.2.100:57696 - cid:41 - <<- [SUB _INBOX.KQyEsJiBiM22h3CgB48uAb.* 1]
Check out the consumer info:
$ nats consumer info stream-1 consumer-1
State:
...
Waiting Pulls: 0 of maximum 512
Nats has no waiting pull any more!
Publish a message:
nats publish stream-1 test
Observe, that app does not receive the message (nats does not deliver it). When you restart the app, message will be received.
Expected result:
Messages are received after re-connect.
Actual result:
Messages are not received after re-connect.