Skip to content

Re-subscriptions after reconnect are not honored #892

@ewirch

Description

@ewirch

Defect

We use the Java client library to create a pull-subscription and listen on it. When NATS server is restarted, the lib reconnects and even re-sends the subcriptions, but NATS won't send any messages to the client any more.

nats-server -DV

I'm running docker.io/library/nats@sha256:58e483681983f87fdc738e980d582d4cc7218914c4f0056d36bab0c5acfc5a8b locally. Executing the above command in the running container gives me:

Starting nats-server
  Version:  2.9.15
  Git:      [b91fa85]
  Go build: go1.19.6
  Name:     NBDPZPHME36GTJH53Q3RBRJCQAKTAWASAKBCYIOAPRV5PYOBEQHP6UA7
  ID:       NBDPZPHME36GTJH53Q3RBRJCQAKTAWASAKBCYIOAPRV5PYOBEQHP6UA7
Created system account: "$SYS"
Error listening on port: 0.0.0.0:4222, "listen tcp 0.0.0.0:4222: bind: address already in use"

Versions of nats-server and affected client libraries used:

nats-server: 2.9.15
client: io.nats:jnats:2.16.9

OS/Container environment:

Podman 4.4.1 on Arch Linux.

Steps or code to reproduce the issue:

Starting with this basic listener:

package test.project;

import io.nats.client.*;
import io.nats.client.api.*;
import io.nats.client.impl.ErrorListenerLoggerImpl;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

public class NatsSubscriber {

    private static final String STREAM_NAME = "stream-1";

    public static void main(String[] args) throws InterruptedException {
        new NatsSubscriber().run();
    }

    private void run() throws InterruptedException {
        try (var connection = createConnection()) {
            createStreamIfNotExists(connection);
            var subscription = createSubscription(connection);
            subscription.pull(10);
            while (true) {
                try {
                    System.out.println("listening...");
                    var message = subscription.nextMessage(Duration.ZERO);
                    if (message != null) {
                        System.out.println("received msg: "+ new String(message.getData()));
                        message.ack();
                    }
                } catch (JetStreamStatusException e) {
                    // ignore "Unknown or unprocessed status message: Server Shutdown"
                }
            }
        }
    }

    public Connection createConnection() {
        var connectionOptions = new Options.Builder()
                .server("nats://localhost:4222")
                .connectionName("nats-subscriber-1")
                .errorListener(new ErrorListenerLoggerImpl())
                .noNoResponders()
                .build();

        try {
            return Nats.connect(connectionOptions);
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private JetStreamSubscription createSubscription(Connection connection) {
        var subscriptionOptions = PullSubscribeOptions.builder()
                .configuration(getConsumerOptions())
                .stream(STREAM_NAME)
                .build();

        try {
            return connection.jetStream().subscribe(STREAM_NAME, subscriptionOptions);
        } catch (IOException | JetStreamApiException e) {
            throw new RuntimeException(e);
        }
    }

    ConsumerConfiguration getConsumerOptions() {
        return ConsumerConfiguration.builder()
                .deliverPolicy(DeliverPolicy.All)
                .ackPolicy(AckPolicy.Explicit)
                .durable("consumer-1")
                .build();
    }

    private void createStreamIfNotExists(Connection connection) {
        try {
            var streamInfo = getStreamInfo(connection);

            if (streamInfo.isEmpty()) {
                createNewStream(connection);
            }
        } catch (IOException | JetStreamApiException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }


    private Optional<StreamInfo> getStreamInfo(Connection connection) throws IOException, JetStreamApiException {
        try {
            return Optional.of(connection.jetStreamManagement().getStreamInfo(STREAM_NAME));
        } catch (JetStreamApiException ex) {
            if (ex.getErrorCode() == 404) {
                return Optional.empty();
            }
            throw ex;
        }
    }

    private void createNewStream(Connection connection) throws JetStreamApiException, IOException {
        var streamConfiguration = getStreamConfiguration();
        connection.jetStreamManagement().addStream(streamConfiguration);
    }

    private StreamConfiguration getStreamConfiguration() {
        return StreamConfiguration.builder()
                .name(STREAM_NAME)
                .storageType(StorageType.File)
                .retentionPolicy(RetentionPolicy.WorkQueue)
                .replicas(1)
                .build();
    }
}

(full project: test-project.zip )

Start nats-server:

podman image pull docker.io/library/nats@sha256:58e483681983f87fdc738e980d582d4cc7218914c4f0056d36bab0c5acfc5a8b
podman run -p 4222:4222 -p 8222:8222 -p 6222:6222 --name nats-server -d docker.io/nats --jetstream --trace

(you can also simply replace podman by docker)
Follow the logs in the background:

podman logs -f <container-id>

Start the sample project above. App waits for messages after subscribing to stream:

listening...

Observe in nats-server output, that nats received the subscibe:

[TRC] 10.0.2.100:60886 - cid:60 - <<- [SUB _INBOX.KQyEsJiBiM22h3CgB48uTJ 2]

Check out the consumer info:

$ nats consumer info stream-1 consumer-1
State:
    ...
    Waiting Pulls: 1 of maximum 512

(I removed irrelevant lines)
So there is a pull registered.

Publish a message:

nats publish stream-1 test

Observe in app output, that message was received:

received msg: test

Now restart nats-server:

podman container restart <container-id>

Again, observe in nats-server output that nats receives subscriptions after re-connect:

[TRC] 10.0.2.100:57696 - cid:41 - <<- [SUB _INBOX.KQyEsJiBiM22h3CgB48uTJ 2]
[TRC] 10.0.2.100:57696 - cid:41 - <<- [SUB _INBOX.KQyEsJiBiM22h3CgB48uAb.* 1]

Check out the consumer info:

$ nats consumer info stream-1 consumer-1
State:
    ...
    Waiting Pulls: 0 of maximum 512

Nats has no waiting pull any more!

Publish a message:

nats publish stream-1 test

Observe, that app does not receive the message (nats does not deliver it). When you restart the app, message will be received.

Expected result:

Messages are received after re-connect.

Actual result:

Messages are not received after re-connect.

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions