From 4d1f751bc560377bcf9f256aebbb3179ecd7ec88 Mon Sep 17 00:00:00 2001 From: liaodongnian <361485583@qq.com> Date: Fri, 6 Feb 2026 19:05:38 +0800 Subject: [PATCH] fix localdistservice missing result local route --- .../mqtt/service/LocalDistService.java | 27 +++-- .../mqtt/service/LocalDistServiceTest.java | 114 +++++++++++++++++- 2 files changed, 129 insertions(+), 12 deletions(-) diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java index 7f4de0a47..6422b802c 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/main/java/org/apache/bifromq/mqtt/service/LocalDistService.java @@ -105,6 +105,7 @@ public CompletableFuture dist(DeliveryRequest request) { Set ok = new HashSet<>(); Set skip = new HashSet<>(); Set noSub = new HashSet<>(); + Set noReceiver = new HashSet<>(); long totalFanOutBytes = 0L; for (DeliveryPack writePack : packageEntry.getValue().getPackList()) { TopicMessagePack topicMsgPack = writePack.getMessagePack(); @@ -124,14 +125,13 @@ public CompletableFuture dist(DeliveryRequest request) { matchInfo); } } else { - // no session found for shared subscription - noSub.add(matchInfo); + noReceiver.add(matchInfo); } } else { Optional> routesFutureOpt = localTopicRouter.getTopicRoutes(tenantId, matchInfo); if (routesFutureOpt.isEmpty()) { - noSub.add(matchInfo); + noReceiver.add(matchInfo); continue; } CompletableFuture routesFuture = @@ -143,15 +143,21 @@ public CompletableFuture dist(DeliveryRequest request) { } ILocalTopicRouter.ILocalRoutes localRoutes = routesFuture.join(); if (!localRoutes.localReceiverId().equals(matchInfo.getReceiverId())) { - noSub.add(matchInfo); + noReceiver.add(matchInfo); continue; } + if (localRoutes.routesInfo().isEmpty()) { + noReceiver.add(matchInfo); + continue; + } + boolean hasUsableSession = false; for (Map.Entry route : localRoutes.routesInfo().entrySet()) { String sessionId = route.getKey(); long incarnation = route.getValue(); // at least one session should publish the message IMQTTSession session = sessionRegistry.get(sessionId); if (session instanceof IMQTTTransientSession) { + hasUsableSession = true; if (isFanOutThrottled && !matchedSessions.isEmpty()) { skip.add(matchInfo); } else { @@ -161,6 +167,9 @@ public CompletableFuture dist(DeliveryRequest request) { } } } + if (!hasUsableSession) { + noReceiver.add(matchInfo); + } } } long msgPackSize = SizeUtil.estSizeOf(topicMsgPack); @@ -185,10 +194,13 @@ public CompletableFuture dist(DeliveryRequest request) { tenantMeter.recordSummary(MqttTransientFanOutBytes, totalFanOutBytes); // don't include duplicated matchInfo in the result // treat skip as ok - Sets.difference(Sets.union(ok, skip), noSub).forEach(matchInfo -> resultsBuilder.addResult( - DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build())); + Sets.difference(Sets.union(ok, skip), Sets.union(noSub, noReceiver)).forEach(matchInfo -> + resultsBuilder.addResult( + DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build())); noSub.forEach(matchInfo -> resultsBuilder.addResult( DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_SUB).build())); + noReceiver.forEach(matchInfo -> resultsBuilder.addResult( + DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_RECEIVER).build())); replyBuilder.putResult(tenantId, resultsBuilder.build()); } return CompletableFuture.completedFuture(replyBuilder.build()); @@ -205,8 +217,7 @@ public CheckReply.Code checkMatchInfo(String tenantId, MatchInfo matchInfo) { return transientSession.hasSubscribed(matchInfo.getMatcher().getMqttTopicFilter()) ? CheckReply.Code.OK : CheckReply.Code.NO_SUB; } else { - // should not be here - return CheckReply.Code.ERROR; + return CheckReply.Code.NO_RECEIVER; } } else { Optional> routesFutureOpt = diff --git a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java index e1f358809..60ea83091 100644 --- a/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java +++ b/bifromq-mqtt/bifromq-mqtt-server/src/test/java/org/apache/bifromq/mqtt/service/LocalDistServiceTest.java @@ -38,6 +38,7 @@ import static org.testng.Assert.assertTrue; import org.apache.bifromq.mqtt.MockableTest; +import org.apache.bifromq.mqtt.session.IMQTTSession; import org.apache.bifromq.mqtt.session.IMQTTTransientSession; import org.apache.bifromq.plugin.subbroker.CheckReply; import org.apache.bifromq.plugin.subbroker.DeliveryPack; @@ -158,6 +159,7 @@ public void checkMatchInfoForNonSharedSub() { assertEquals(code, CheckReply.Code.NO_RECEIVER); when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L)); + when(localSessionRegistry.get(channelId)).thenReturn(session); code = localDistService.checkMatchInfo(tenantId, MatchInfo.newBuilder() .setMatcher(TopicUtil.from(topicFilter)) .setReceiverId(ILocalDistService.localize(channelId)) @@ -364,7 +366,7 @@ public void deliverToMismatchedReceiver() { DeliveryResults results = reply.getResultMap().get(tenantId); DeliveryResult result = results.getResult(0); - assertEquals(DeliveryResult.Code.NO_SUB, result.getCode()); + assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER); } @Test @@ -398,7 +400,111 @@ public void deliverToNoLocalRoute() { DeliveryResults results = reply.getResultMap().get(tenantId); DeliveryResult result = results.getResult(0); - assertEquals(DeliveryResult.Code.NO_SUB, result.getCode()); + assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER); + } + + @Test + public void deliverToEmptyLocalRoutes() { + String tenantId = "tenant1"; + String topic = "testTopic"; + String topicFilter = "testTopic/#"; + String channelId = "channel0"; + MatchInfo matchInfo = MatchInfo.newBuilder() + .setMatcher(TopicUtil.from(topicFilter)) + .setReceiverId("receiverId") + .build(); + TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build(); + DeliveryPackage deliveryPack = DeliveryPackage.newBuilder() + .addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build()) + .build(); + DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build(); + + ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class); + when(localRoutes.localReceiverId()).thenReturn("receiverId"); + when(localRoutes.routesInfo()).thenReturn(Map.of()); + when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn( + Optional.of(CompletableFuture.completedFuture(localRoutes))); + + LocalDistService localDistService = + new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler); + + CompletableFuture future = localDistService.dist(request); + DeliveryReply reply = future.join(); + + DeliveryResults results = reply.getResultMap().get(tenantId); + DeliveryResult result = results.getResult(0); + assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER); + } + + @Test + public void deliverToNoLocalSession() { + String tenantId = "tenant1"; + String topic = "testTopic"; + String topicFilter = "testTopic/#"; + String channelId = "channel0"; + MatchInfo matchInfo = MatchInfo.newBuilder() + .setMatcher(TopicUtil.from(topicFilter)) + .setReceiverId("receiverId") + .build(); + TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build(); + DeliveryPackage deliveryPack = DeliveryPackage.newBuilder() + .addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build()) + .build(); + DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build(); + + ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class); + when(localRoutes.localReceiverId()).thenReturn("receiverId"); + when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L)); + when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn( + Optional.of(CompletableFuture.completedFuture(localRoutes))); + + when(localSessionRegistry.get(channelId)).thenReturn(null); + + LocalDistService localDistService = + new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler); + + CompletableFuture future = localDistService.dist(request); + DeliveryReply reply = future.join(); + + DeliveryResults results = reply.getResultMap().get(tenantId); + DeliveryResult result = results.getResult(0); + assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER); + } + + @Test + public void deliverToNonTransientSession() { + String tenantId = "tenant1"; + String topic = "testTopic"; + String topicFilter = "testTopic/#"; + String channelId = "channel0"; + MatchInfo matchInfo = MatchInfo.newBuilder() + .setMatcher(TopicUtil.from(topicFilter)) + .setReceiverId("receiverId") + .build(); + TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build(); + DeliveryPackage deliveryPack = DeliveryPackage.newBuilder() + .addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build()) + .build(); + DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build(); + + ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class); + when(localRoutes.localReceiverId()).thenReturn("receiverId"); + when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L)); + when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn( + Optional.of(CompletableFuture.completedFuture(localRoutes))); + + IMQTTSession nonTransientSession = mock(IMQTTSession.class); + when(localSessionRegistry.get(channelId)).thenReturn(nonTransientSession); + + LocalDistService localDistService = + new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler); + + CompletableFuture future = localDistService.dist(request); + DeliveryReply reply = future.join(); + + DeliveryResults results = reply.getResultMap().get(tenantId); + DeliveryResult result = results.getResult(0); + assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER); } @Test @@ -432,7 +538,7 @@ public void deliverToNoResolvedRoute() { DeliveryResults results = reply.getResultMap().get(tenantId); DeliveryResult result = results.getResult(0); - assertEquals(DeliveryResult.Code.OK, result.getCode()); + assertEquals(result.getCode(), DeliveryResult.Code.OK); } @Test @@ -467,7 +573,7 @@ public void deliverWhileRouteResolveException() { DeliveryResults results = reply.getResultMap().get(tenantId); DeliveryResult result = results.getResult(0); - assertEquals(DeliveryResult.Code.OK, result.getCode()); + assertEquals(result.getCode(), DeliveryResult.Code.OK); } @Test