From 13d1cbe79f13cef5ebdcba9f67ebee839e333a33 Mon Sep 17 00:00:00 2001 From: xiaocairush Date: Mon, 2 Jun 2025 17:38:04 +0800 Subject: [PATCH 1/6] [fix][test] fix Flaky-test: ExtensibleLoadManagerImplTest.testLoadBalancerServiceUnitTableViewSyncer --- .../ExtensibleLoadManagerImplTest.java | 112 ++++++++++++------ 1 file changed, 75 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 0417b6fc14412..e0cac44606276 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -40,6 +40,7 @@ import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2; import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -259,7 +260,8 @@ public void testCheckOwnershipAsync() throws Exception { assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); // 2. Assign the bundle to a broker. - Optional lookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); + Optional lookupData = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); assertTrue(lookupData.isPresent()); if (lookupData.get().getPulsarServiceUrl().equals(pulsar1.getBrokerServiceUrl())) { assertTrue(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); @@ -292,14 +294,16 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); + Optional brokerLookupData = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); assertTrue(brokerLookupData.isPresent()); assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); } @Test public void testFilterHasException() throws Exception { - Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception"); + Pair topicAndBundle = + getBundleIsNotOwnByChangeEventTopic("test-filter-has-exception"); NamespaceBundle bundle = topicAndBundle.getRight(); doReturn(List.of(new MockBrokerFilter() { @@ -312,7 +316,8 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); + Optional brokerLookupData = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); assertTrue(brokerLookupData.isPresent()); } @@ -638,7 +643,7 @@ public static void testTransferClientReconnectionWithoutLookup( verify(lookupService.getRight(), never()).getBroker(topicName); } } finally { - for (var consumer: consumers) { + for (var consumer : consumers) { consumer.close(); } @@ -837,7 +842,8 @@ public static void testOptimizeUnloadDisable(List clients, } } - protected static Pair spyLookupService(PulsarClient client) throws IllegalAccessException { + protected static Pair spyLookupService(PulsarClient client) + throws IllegalAccessException { LookupService svc = (LookupService) FieldUtils.readDeclaredField(client, "lookup", true); var lookup = spy(svc); FieldUtils.writeDeclaredField(client, "lookup", lookup, true); @@ -978,6 +984,7 @@ public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception { assertTrue(bundlesData.getBoundaries().contains(midBundle)); assertTrue(bundlesData.getBoundaries().contains(highBundle)); } + @Test(timeOut = 30 * 1000) public void testDeleteNamespaceBundle() throws Exception { final String namespace = "public/testDeleteNamespaceBundle"; @@ -1057,7 +1064,7 @@ public CompletableFuture> filterAsync(Map> filterAsync(Map brokers, ServiceUnitId serviceUnit, @@ -1065,7 +1072,8 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); + Optional brokerLookupData = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()).get(); Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { assertTrue(brokerLookupData.isPresent()); assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); @@ -1213,7 +1221,8 @@ public void testDeployAndRollbackLoadManager() throws Exception { log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); assertNotEquals(result, pulsar4.getBrokerServiceUrl()); - Producer producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + Producer producer = + pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); producer.send("t1"); // Test re-register broker and check the lookup result @@ -1225,7 +1234,8 @@ public void testDeployAndRollbackLoadManager() throws Exception { assertEquals(result, pulsar4.getBrokerServiceUrl()); producer.send("t2"); - Producer producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + Producer producer1 = + pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); producer1.send("t3"); producer.close(); @@ -1276,12 +1286,21 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerTyp); + Awaitility.waitAtMost(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertTrue(pulsar1.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); + assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); + }); + + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); + makeSecondaryAsLeader(); makePrimaryAsLeader(); - Awaitility.waitAtMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive())); - Awaitility.waitAtMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive())); + assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + assertFalse(secondaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); + ServiceConfiguration defaultConf = getDefaultConf(); defaultConf.setAllowAutoTopicCreation(true); defaultConf.setForceDeleteNamespaceAllowed(true); @@ -1415,29 +1434,41 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); var loadManager4 = spy((ExtensibleLoadManagerImpl) FieldUtils.readField(wrapper, "loadManager", true)); + + var brokerRegistry = spy(loadManager4.getBrokerRegistry()); + doReturn(CompletableFuture.completedFuture(null)).when(brokerRegistry).registerAsync(); loadManager4.getBrokerRegistry().unregister(); NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); String slaMonitorTopic = slaMonitorNamespace.getPersistentTopicName("test"); - String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); - assertNotNull(result); - log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); - assertNotEquals(result, pulsar4.getBrokerServiceUrl()); - Producer producer = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + String currentResult = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + assertNotNull(currentResult); + log.info("{} Namespace is re-owned by {}", slaMonitorTopic, currentResult); + assertNotEquals(currentResult, pulsar4.getBrokerServiceUrl()); + }); + + Producer producer = + pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); producer.send("t1"); // Test re-register broker and check the lookup result + doCallRealMethod().when(brokerRegistry).registerAsync(); loadManager4.getBrokerRegistry().registerAsync().get(); - result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); + String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); assertNotNull(result); log.info("{} Namespace is re-owned by {}", slaMonitorTopic, result); assertEquals(result, pulsar4.getBrokerServiceUrl()); producer.send("t2"); - Producer producer1 = pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); + Producer producer1 = + pulsar.getClient().newProducer(Schema.STRING).topic(slaMonitorTopic).create(); producer1.send("t3"); producer.close(); @@ -1508,6 +1539,8 @@ private void makePrimaryAsLeader() throws Exception { Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { assertFalse(channel2.isChannelOwner()); }); + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); } private void makeSecondaryAsLeader() throws Exception { @@ -1526,6 +1559,9 @@ private void makeSecondaryAsLeader() throws Exception { Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { assertFalse(channel1.isChannelOwner()); }); + + primaryLoadManager.monitor(); + secondaryLoadManager.monitor(); } @Test(timeOut = 30 * 1000, priority = 2100) @@ -1593,27 +1629,27 @@ public void testRoleChangeIdempotency() throws Exception { } - primaryLoadManager.playFollower(); - secondaryLoadManager.playFollower(); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - primaryLoadManager.getRole()); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - secondaryLoadManager.getRole()); + primaryLoadManager.playFollower(); + secondaryLoadManager.playFollower(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); - primaryLoadManager.playLeader(); - secondaryLoadManager.playLeader(); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - primaryLoadManager.getRole()); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - secondaryLoadManager.getRole()); + primaryLoadManager.playLeader(); + secondaryLoadManager.playLeader(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); } @DataProvider(name = "noChannelOwnerMonitorHandler") public Object[][] noChannelOwnerMonitorHandler() { - return new Object[][] { { true }, { false } }; + return new Object[][]{{true}, {false}}; } @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101) @@ -1702,7 +1738,8 @@ public void testRoleChange() throws Exception { String bundle = "public/default/0x00000000_0xffffffff"; TopBundlesLoadData topBundlesExpected = new TopBundlesLoadData(); topBundlesExpected.getTopBundlesLoadData().clear(); - topBundlesExpected.getTopBundlesLoadData().add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats())); + topBundlesExpected.getTopBundlesLoadData() + .add(new TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats())); Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { @@ -1978,7 +2015,7 @@ public void testDisableBroker() throws Exception { var pulsar3 = additionalPulsarTestContext.getPulsarService(); ExtensibleLoadManagerImpl ternaryLoadManager = spy((ExtensibleLoadManagerImpl) FieldUtils.readField(pulsar3.getLoadManager().get(), "loadManager", true)); - String topic = "persistent://" + defaultTestNamespace +"/test"; + String topic = "persistent://" + defaultTestNamespace + "/test"; String lookupResult1 = pulsar3.getAdminClient().lookups().lookupTopic(topic); TopicName topicName = TopicName.get(topic); @@ -2109,7 +2146,8 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; admin.topics().createPartitionedTopic(topic, 1); NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join(); - CompletableFuture> owner = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); + CompletableFuture> owner = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); assertFalse(owner.join().isEmpty()); BrokerLookupData brokerLookupData = owner.join().get(); From 12f55884d6eda5748bf9b614516e18ac5c527f43 Mon Sep 17 00:00:00 2001 From: xiaocairush Date: Mon, 2 Jun 2025 21:53:29 +0800 Subject: [PATCH 2/6] [fix][test] mock to failed registerAsyncWithRetries --- .../ExtensibleLoadManagerImplTest.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e0cac44606276..2acbb2c66219b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -39,6 +39,8 @@ import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespace; import static org.apache.pulsar.broker.namespace.NamespaceService.getHeartbeatNamespaceV2; import static org.apache.pulsar.broker.namespace.NamespaceService.getSLAMonitorNamespace; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; @@ -137,6 +139,7 @@ import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; @@ -1281,7 +1284,6 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertTrue(webServiceUrlBefore2.isPresent()); assertEquals(webServiceUrlBefore2.get().toString(), webServiceUrlBefore1.get().toString()); - String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() @@ -1432,12 +1434,17 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { } // Check if the broker is available var wrapper = (ExtensibleLoadManagerWrapper) pulsar4.getLoadManager().get(); - var loadManager4 = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - - var brokerRegistry = spy(loadManager4.getBrokerRegistry()); - doReturn(CompletableFuture.completedFuture(null)).when(brokerRegistry).registerAsync(); - loadManager4.getBrokerRegistry().unregister(); + var loadManager4 = (ExtensibleLoadManagerImpl) FieldUtils.readField(wrapper, "loadManager", true); + + // simulate pulsar4 store error which will unregister the broker + var brokerRegistry = loadManager4.getBrokerRegistry(); + var brokerLookupDataMetadataCache = (MetadataCache) spy( + FieldUtils.readField(brokerRegistry, "brokerLookupDataMetadataCache", true)); + doReturn(CompletableFuture.failedFuture(new MetadataStoreException("store error"))).when( + brokerLookupDataMetadataCache).put(anyString(), any(), any()); + FieldUtils.writeDeclaredField(brokerRegistry, "brokerLookupDataMetadataCache", + brokerLookupDataMetadataCache, true); + brokerRegistry.unregister(); NamespaceName slaMonitorNamespace = getSLAMonitorNamespace(pulsar4.getBrokerId(), pulsar.getConfiguration()); @@ -1458,7 +1465,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { producer.send("t1"); // Test re-register broker and check the lookup result - doCallRealMethod().when(brokerRegistry).registerAsync(); + doCallRealMethod().when(brokerLookupDataMetadataCache).put(anyString(), any(), any()); loadManager4.getBrokerRegistry().registerAsync().get(); String result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); From 05c0bbf3250d4aa9f5805a860a6bd36f6506e28f Mon Sep 17 00:00:00 2001 From: xiaocairush Date: Mon, 2 Jun 2025 22:15:07 +0800 Subject: [PATCH 3/6] [fix][test] add more comments. --- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 2acbb2c66219b..df9c79ff5feff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1295,9 +1295,13 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); }); + // We invoke monitor method ensure SystemTopicToMetadataStoreSyncer to start or close because syncer will not + // be started or close after pulsar.getAdminClient().brokers().updateDynamicConfiguration(); primaryLoadManager.monitor(); secondaryLoadManager.monitor(); + // We invoke monitor method in makePrimaryAsLeader and makeSecondaryAsLeader because monitor can immediately + // trigger serviceUnitStateTableViewSyncer to start or close without wait. makeSecondaryAsLeader(); makePrimaryAsLeader(); assertTrue(primaryLoadManager.getServiceUnitStateTableViewSyncer().isActive()); From b443b0a4a93b5eaa424cc62be1f0ff95b33b0209 Mon Sep 17 00:00:00 2001 From: xiaocairush Date: Mon, 2 Jun 2025 22:16:11 +0800 Subject: [PATCH 4/6] [fix][test] add more comments. --- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index df9c79ff5feff..599b2de82b794 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1295,7 +1295,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertTrue(pulsar2.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled()); }); - // We invoke monitor method ensure SystemTopicToMetadataStoreSyncer to start or close because syncer will not + // We invoke monitor method to ensure SystemTopicToMetadataStoreSyncer to start or close because syncer will not // be started or close after pulsar.getAdminClient().brokers().updateDynamicConfiguration(); primaryLoadManager.monitor(); secondaryLoadManager.monitor(); From c131c657334e7bb31c25e90296240da53c130f2b Mon Sep 17 00:00:00 2001 From: xiaocairush Date: Thu, 17 Jul 2025 15:56:35 +0800 Subject: [PATCH 5/6] fix code style --- .../ExtensibleLoadManagerImplTest.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index e30e98a4b4aee..176636b168977 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1678,27 +1678,27 @@ public void testRoleChangeIdempotency() throws Exception { } - primaryLoadManager.playFollower(); - secondaryLoadManager.playFollower(); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - primaryLoadManager.getRole()); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - secondaryLoadManager.getRole()); + primaryLoadManager.playFollower(); + secondaryLoadManager.playFollower(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); - primaryLoadManager.playLeader(); - secondaryLoadManager.playLeader(); - assertEquals(ExtensibleLoadManagerImpl.Role.Leader, - primaryLoadManager.getRole()); - assertEquals(ExtensibleLoadManagerImpl.Role.Follower, - secondaryLoadManager.getRole()); + primaryLoadManager.playLeader(); + secondaryLoadManager.playLeader(); + assertEquals(ExtensibleLoadManagerImpl.Role.Leader, + primaryLoadManager.getRole()); + assertEquals(ExtensibleLoadManagerImpl.Role.Follower, + secondaryLoadManager.getRole()); } @DataProvider(name = "noChannelOwnerMonitorHandler") public Object[][] noChannelOwnerMonitorHandler() { - return new Object[][] { { true }, { false } }; + return new Object[][]{{true}, {false}}; } @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, priority = 2101) @@ -2199,7 +2199,8 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio String topic = "persistent://" + defaultTestNamespace + "/test-get-owned-service-units"; admin.topics().createPartitionedTopic(topic, 1); NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).join(); - CompletableFuture> owner = primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); + CompletableFuture> owner = + primaryLoadManager.assign(Optional.empty(), bundle, LookupOptions.builder().build()); assertFalse(owner.join().isEmpty()); BrokerLookupData brokerLookupData = owner.join().get(); From 0108fd623361a453063014fc3a95870ba97044c2 Mon Sep 17 00:00:00 2001 From: xiaocairush Date: Thu, 17 Jul 2025 16:03:41 +0800 Subject: [PATCH 6/6] fix code style --- .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 176636b168977..e4ce22347cd4e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1320,8 +1320,8 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { assertTrue(webServiceUrlBefore2.isPresent()); assertEquals(webServiceUrlBefore2.get().toString(), webServiceUrlBefore1.get().toString()); - String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) ? - "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; + String syncerTyp = serviceUnitStateTableViewClassName.equals(ServiceUnitStateTableViewImpl.class.getName()) + ? "SystemTopicToMetadataStoreSyncer" : "MetadataStoreToSystemTopicSyncer"; pulsar.getAdminClient().brokers() .updateDynamicConfiguration("loadBalancerServiceUnitTableViewSyncer", syncerTyp); Awaitility.waitAtMost(10, TimeUnit.SECONDS)