Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
Set<MatchInfo> ok = new HashSet<>();
Set<MatchInfo> skip = new HashSet<>();
Set<MatchInfo> noSub = new HashSet<>();
Set<MatchInfo> noReceiver = new HashSet<>();
long totalFanOutBytes = 0L;
for (DeliveryPack writePack : packageEntry.getValue().getPackList()) {
TopicMessagePack topicMsgPack = writePack.getMessagePack();
Expand All @@ -124,14 +125,13 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
matchInfo);
}
} else {
// no session found for shared subscription
noSub.add(matchInfo);
noReceiver.add(matchInfo);
}
} else {
Optional<CompletableFuture<? extends ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
localTopicRouter.getTopicRoutes(tenantId, matchInfo);
if (routesFutureOpt.isEmpty()) {
noSub.add(matchInfo);
noReceiver.add(matchInfo);
continue;
}
CompletableFuture<? extends ILocalTopicRouter.ILocalRoutes> routesFuture =
Expand All @@ -143,15 +143,21 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
}
ILocalTopicRouter.ILocalRoutes localRoutes = routesFuture.join();
if (!localRoutes.localReceiverId().equals(matchInfo.getReceiverId())) {
noSub.add(matchInfo);
noReceiver.add(matchInfo);
continue;
}
if (localRoutes.routesInfo().isEmpty()) {
noReceiver.add(matchInfo);
continue;
}
boolean hasUsableSession = false;
for (Map.Entry<String, Long> route : localRoutes.routesInfo().entrySet()) {
String sessionId = route.getKey();
long incarnation = route.getValue();
// at least one session should publish the message
IMQTTSession session = sessionRegistry.get(sessionId);
if (session instanceof IMQTTTransientSession) {
hasUsableSession = true;
if (isFanOutThrottled && !matchedSessions.isEmpty()) {
skip.add(matchInfo);
} else {
Expand All @@ -161,6 +167,9 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
}
}
}
if (!hasUsableSession) {
noReceiver.add(matchInfo);
}
}
}
long msgPackSize = SizeUtil.estSizeOf(topicMsgPack);
Expand All @@ -185,10 +194,13 @@ public CompletableFuture<DeliveryReply> dist(DeliveryRequest request) {
tenantMeter.recordSummary(MqttTransientFanOutBytes, totalFanOutBytes);
// don't include duplicated matchInfo in the result
// treat skip as ok
Sets.difference(Sets.union(ok, skip), noSub).forEach(matchInfo -> resultsBuilder.addResult(
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
Sets.difference(Sets.union(ok, skip), Sets.union(noSub, noReceiver)).forEach(matchInfo ->
resultsBuilder.addResult(
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.OK).build()));
noSub.forEach(matchInfo -> resultsBuilder.addResult(
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_SUB).build()));
noReceiver.forEach(matchInfo -> resultsBuilder.addResult(
DeliveryResult.newBuilder().setMatchInfo(matchInfo).setCode(DeliveryResult.Code.NO_RECEIVER).build()));
replyBuilder.putResult(tenantId, resultsBuilder.build());
}
return CompletableFuture.completedFuture(replyBuilder.build());
Expand All @@ -205,8 +217,7 @@ public CheckReply.Code checkMatchInfo(String tenantId, MatchInfo matchInfo) {
return transientSession.hasSubscribed(matchInfo.getMatcher().getMqttTopicFilter())
? CheckReply.Code.OK : CheckReply.Code.NO_SUB;
} else {
// should not be here
return CheckReply.Code.ERROR;
return CheckReply.Code.NO_RECEIVER;
}
} else {
Optional<CompletableFuture<? extends ILocalTopicRouter.ILocalRoutes>> routesFutureOpt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static org.testng.Assert.assertTrue;

import org.apache.bifromq.mqtt.MockableTest;
import org.apache.bifromq.mqtt.session.IMQTTSession;
import org.apache.bifromq.mqtt.session.IMQTTTransientSession;
import org.apache.bifromq.plugin.subbroker.CheckReply;
import org.apache.bifromq.plugin.subbroker.DeliveryPack;
Expand Down Expand Up @@ -158,6 +159,7 @@ public void checkMatchInfoForNonSharedSub() {
assertEquals(code, CheckReply.Code.NO_RECEIVER);

when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
when(localSessionRegistry.get(channelId)).thenReturn(session);
code = localDistService.checkMatchInfo(tenantId, MatchInfo.newBuilder()
.setMatcher(TopicUtil.from(topicFilter))
.setReceiverId(ILocalDistService.localize(channelId))
Expand Down Expand Up @@ -364,7 +366,7 @@ public void deliverToMismatchedReceiver() {

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}

@Test
Expand Down Expand Up @@ -398,7 +400,111 @@ public void deliverToNoLocalRoute() {

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(DeliveryResult.Code.NO_SUB, result.getCode());
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}

@Test
public void deliverToEmptyLocalRoutes() {
String tenantId = "tenant1";
String topic = "testTopic";
String topicFilter = "testTopic/#";
String channelId = "channel0";
MatchInfo matchInfo = MatchInfo.newBuilder()
.setMatcher(TopicUtil.from(topicFilter))
.setReceiverId("receiverId")
.build();
TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build();
DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
.build();
DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();

ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class);
when(localRoutes.localReceiverId()).thenReturn("receiverId");
when(localRoutes.routesInfo()).thenReturn(Map.of());
when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
Optional.of(CompletableFuture.completedFuture(localRoutes)));

LocalDistService localDistService =
new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler);

CompletableFuture<DeliveryReply> future = localDistService.dist(request);
DeliveryReply reply = future.join();

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}

@Test
public void deliverToNoLocalSession() {
String tenantId = "tenant1";
String topic = "testTopic";
String topicFilter = "testTopic/#";
String channelId = "channel0";
MatchInfo matchInfo = MatchInfo.newBuilder()
.setMatcher(TopicUtil.from(topicFilter))
.setReceiverId("receiverId")
.build();
TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build();
DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
.build();
DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();

ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class);
when(localRoutes.localReceiverId()).thenReturn("receiverId");
when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
Optional.of(CompletableFuture.completedFuture(localRoutes)));

when(localSessionRegistry.get(channelId)).thenReturn(null);

LocalDistService localDistService =
new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler);

CompletableFuture<DeliveryReply> future = localDistService.dist(request);
DeliveryReply reply = future.join();

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}

@Test
public void deliverToNonTransientSession() {
String tenantId = "tenant1";
String topic = "testTopic";
String topicFilter = "testTopic/#";
String channelId = "channel0";
MatchInfo matchInfo = MatchInfo.newBuilder()
.setMatcher(TopicUtil.from(topicFilter))
.setReceiverId("receiverId")
.build();
TopicMessagePack topicMessagePack = TopicMessagePack.newBuilder().setTopic(topic).build();
DeliveryPackage deliveryPack = DeliveryPackage.newBuilder()
.addPack(DeliveryPack.newBuilder().setMessagePack(topicMessagePack).addMatchInfo(matchInfo).build())
.build();
DeliveryRequest request = DeliveryRequest.newBuilder().putPackage(tenantId, deliveryPack).build();

ILocalTopicRouter.ILocalRoutes localRoutes = mock(ILocalTopicRouter.ILocalRoutes.class);
when(localRoutes.localReceiverId()).thenReturn("receiverId");
when(localRoutes.routesInfo()).thenReturn(Map.of(channelId, 1L));
when(localTopicRouter.getTopicRoutes(anyString(), any())).thenReturn(
Optional.of(CompletableFuture.completedFuture(localRoutes)));

IMQTTSession nonTransientSession = mock(IMQTTSession.class);
when(localSessionRegistry.get(channelId)).thenReturn(nonTransientSession);

LocalDistService localDistService =
new LocalDistService(serverId, localSessionRegistry, localTopicRouter, distClient, resourceThrottler);

CompletableFuture<DeliveryReply> future = localDistService.dist(request);
DeliveryReply reply = future.join();

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(result.getCode(), DeliveryResult.Code.NO_RECEIVER);
}

@Test
Expand Down Expand Up @@ -432,7 +538,7 @@ public void deliverToNoResolvedRoute() {

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(DeliveryResult.Code.OK, result.getCode());
assertEquals(result.getCode(), DeliveryResult.Code.OK);
}

@Test
Expand Down Expand Up @@ -467,7 +573,7 @@ public void deliverWhileRouteResolveException() {

DeliveryResults results = reply.getResultMap().get(tenantId);
DeliveryResult result = results.getResult(0);
assertEquals(DeliveryResult.Code.OK, result.getCode());
assertEquals(result.getCode(), DeliveryResult.Code.OK);
}

@Test
Expand Down
Loading