diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index d7e85af1ca7a9..e4ce22347cd4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -39,7 +39,10 @@ import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace; import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2; import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -136,6 +139,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -677,7 +681,7 @@ public static void testTransferClientReconnectionWithoutLookup( verify(lookupService.getRight(), never()).getBroker(topicName); } } finally { - for (var consumer: consumers) { + for (var consumer : consumers) { consumer.close(); } @@ -1019,6 +1023,7 @@ public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception { assertTrue(bundlesData.getBoundaries().contains(midBundle)); assertTrue(bundlesData.getBoundaries().contains(highBundle)); } + @Test(timeOut = 30 * 1000) public void testDeleteNamespaceBundle() throws Exception { final String namespace = "public/testDeleteNamespaceBundle"; @@ -1315,19 +1320,29 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertTrue(webServiceUrlBefore2.isPresent()); assertEquals(webServiceUrlBefore2.get().toString(), webServiceUrlBefore1.get().toString()); - String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerTyp); + Awaitility.waitAtMost(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); + assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); + }); + + // We invoke monitor method to ensure SystemTopicToMetadataStoreSyncer to start or close because syncer will not + // be started or close after pulsar.getAdminClient().brokers().updateDynamicConfiguration(); + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); + + // We invoke monitor method in makePrimaryAsLeader and makeSecondaryAsLeader because monitor can immediately + // trigger serviceUnitStateTableViewSyncer to start or close without wait. makeSecondaryAsLeader(); makePrimaryAsLeader(); - Awaitility.waitAtMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer() - .isActive())); - Awaitility.waitAtMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer() - .isActive())); + assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + ServiceConfiguration defaultConf = getDefaultConf(); defaultConf.setAllowAutoTopicCreation(true); defaultConf.setForceDeleteNamespaceAllowed(true); @@ -1459,33 +1474,48 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { } // Check if the broker is available var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); - var loadManager4 = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - loadManager4.getBrokerRegistry().unregister(); + var loadManager4 = (ExtensibleLoadManagerImpl) FieldUtils.readField(wrapper, "loadManager", true); + + // simulate pulsar4 store error which will unregister the broker + var brokerRegistry = loadManager4.getBrokerRegistry(); + var brokerLookupDataMetadataCache = (MetadataCache) spy( + FieldUtils.readField(brokerRegistry, "brokerLookupDataMetadataCache", true)); + doReturn(CompletableFuture.failedFuture(new MetadataStoreException("store error"))).when( + brokerLookupDataMetadataCache).put(anyString(), any(), any()); + FieldUtils.writeDeclaredField(brokerRegistry, "brokerLookupDataMetadataCache", + brokerLookupDataMetadataCache, true); + brokerRegistry.unregister(); NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); - String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(result); - log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - assertNotEquals(result, pulsar4.getBrokerServiceUrl()); - Producer producer = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + String currentResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(currentResult); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, currentResult); + assertNotEquals(currentResult, pulsar4.getBrokerServiceUrl()); + }); + + Producer producer = + pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); producer.send("t1"); // Test re-register broker and check the lookup result + doCallRealMethod().when(brokerLookupDataMetadataCache).put(anyString(), any(), any()); loadManager4.getBrokerRegistry().registerAsync().get(); - result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); assertNotNull(result); log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); assertEquals(result, pulsar4.getBrokerServiceUrl()); producer.send("t2"); - Producer producer1 = pulsar.getClient().newProducer(Schema.STRING) - .topic(slaMonitorTopic).create(); + Producer producer1 = + pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); producer1.send("t3"); producer.close(); @@ -1558,6 +1588,8 @@ private void makePrimaryAsLeader() throws Exception { Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { assertFalse(channel2.isChannelOwner()); }); + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); } private void makeSecondaryAsLeader() throws Exception { @@ -1576,6 +1608,9 @@ private void makeSecondaryAsLeader() throws Exception { Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { assertFalse(channel1.isChannelOwner()); }); + + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); } @Test(timeOut = 30 * 1000, priority = 2100) @@ -1643,27 +1678,27 @@ public void testRoleChangeIdempotency() throws Exception { } - primaryLoadManager.playFollower(); - secondaryLoadManager.playFollower(); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - primaryLoadManager.getRole()); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - secondaryLoadManager.getRole()); + primaryLoadManager.playFollower(); + secondaryLoadManager.playFollower(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); - primaryLoadManager.playLeader(); - secondaryLoadManager.playLeader(); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - primaryLoadManager.getRole()); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - secondaryLoadManager.getRole()); + primaryLoadManager.playLeader(); + secondaryLoadManager.playLeader(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); } @DataProvider(name = "noChannelOwnerMonitorHandler") public Object[][] noChannelOwnerMonitorHandler() { - return new Object[][] { { true }, { false } }; + return new Object[][]{{true}, {false}}; } @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101) @@ -2164,7 +2199,8 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; admin.topics().createPartitionedTopic(topic, 1); NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join(); - CompletableFuture> owner = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); + CompletableFuture> owner = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); assertFalse(owner.join().isEmpty()); BrokerLookupData brokerLookupData = owner.join().get();