Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ public void startDiskBalancer(DiskBalancerConfigurationProto configProto)
adminChecker.check("startDiskBalancer");
final DiskBalancerInfo info = getDiskBalancerInfoImpl();

// Check if DiskBalancer is already running or paused
DiskBalancerRunningStatus currentStatus = info.getOperationalState();
if (currentStatus == DiskBalancerRunningStatus.RUNNING
|| currentStatus == DiskBalancerRunningStatus.PAUSED) {
// If already running/paused and no configuration change requested, log warning and return early
LOG.warn("DiskBalancer is already in {} state.", currentStatus);
return;
}

// Check node operational state before starting DiskBalancer
// Only IN_SERVICE nodes should actively balance disks
NodeOperationalState nodeState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,5 +259,24 @@ void testUpdateRequiresAdmin() {
assertEquals("Access denied for operation: updateDiskBalancerConfiguration",
exception.getMessage());
}

@Test
void testStartDiskBalancerWhenAlreadyRunning() throws IOException {
// Start DiskBalancer first
server.startDiskBalancer(null);
assertEquals(DiskBalancerRunningStatus.RUNNING, diskBalancerInfo.getOperationalState());

// Verify service was refreshed once (from the first start)
verify(diskBalancerService, times(1)).refresh(diskBalancerInfo);

// Try to start again - should log warning and return early without exception
server.startDiskBalancer(null);

// Verify state is still RUNNING (unchanged)
assertEquals(DiskBalancerRunningStatus.RUNNING, diskBalancerInfo.getOperationalState());

// Verify service was not refreshed again (still only once)
verify(diskBalancerService, times(1)).refresh(diskBalancerInfo);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -74,24 +76,30 @@ public Void call() throws Exception {
return null;
}

// Deduplicate target datanodes by address
Set<String> uniqueTargetDatanodes = new LinkedHashSet<>(targetDatanodes);
targetDatanodes = new ArrayList<>(uniqueTargetDatanodes);

// Track if we're using batch mode for display
isBatchMode = options.isInServiceDatanodes();

// Execute on all target datanodes and collect results
List<String> successNodes = new ArrayList<>();
List<String> failedNodes = new ArrayList<>();
List<Object> jsonResults = new ArrayList<>();
Set<String> successNodes = new LinkedHashSet<>();
Set<String> failedNodes = new LinkedHashSet<>();
Map<String, Object> jsonResults = new LinkedHashMap<>();

// Execute commands and collect results
for (String dn : targetDatanodes) {
String hostname = DiskBalancerSubCommandUtil.getDatanodeHostname(dn);

try {
Object result = executeCommand(dn);
successNodes.add(dn);
successNodes.add(hostname);
if (options.isJson()) {
jsonResults.add(result);
jsonResults.put(hostname, result);
}
} catch (Exception e) {
failedNodes.add(dn);
failedNodes.add(hostname);
String errorMsg = e.getMessage();
if (errorMsg != null && errorMsg.contains("\n")) {
errorMsg = errorMsg.split("\n", 2)[0];
Expand All @@ -100,20 +108,20 @@ public Void call() throws Exception {
errorMsg = e.getClass().getSimpleName();
}
if (options.isJson()) {
// Create error result object in JSON format
Map<String, Object> errorResult = createErrorResult(dn, errorMsg);
jsonResults.add(errorResult);
Map<String, Object> errorResult = createErrorResult(hostname, errorMsg);
jsonResults.put(hostname, errorResult);
} else {
// Print error messages in non-JSON mode
System.err.printf("Error on node [%s]: %s%n", dn, errorMsg);
System.err.printf("Error on node [%s]: %s%n", hostname, errorMsg);
}
}
}

// Output results
if (options.isJson()) {
if (!jsonResults.isEmpty()) {
System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(jsonResults));
// Convert Map values to List for JSON serialization
System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(new ArrayList<>(jsonResults.values())));
}
} else {
displayResults(successNodes, failedNodes);
Expand Down Expand Up @@ -186,10 +194,10 @@ protected String validateParameters() {
* Display consolidated results after executing on all datanodes.
* For JSON mode, this may be called for summary purposes only.
*
* @param successNodes list of nodes where command succeeded
* @param failedNodes list of nodes where command failed
* @param successNodes set of nodes where command succeeded
* @param failedNodes set of nodes where command failed
*/
protected abstract void displayResults(List<String> successNodes, List<String> failedNodes);
protected abstract void displayResults(Set<String> successNodes, Set<String> failedNodes);

/**
* Get the action name for this command (e.g., "start", "stop", "update", "status", "report").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.DiskBalancerProtocol;
Expand Down Expand Up @@ -64,7 +65,7 @@ protected Object executeCommand(String hostName) throws IOException {
}

@Override
protected void displayResults(List<String> successNodes, List<String> failedNodes) {
protected void displayResults(Set<String> successNodes, Set<String> failedNodes) {
// In JSON mode, results are already written
if (getOptions().isJson()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.DiskBalancerProtocol;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDiskBalancerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DiskBalancerConfigurationProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DiskBalancerRunningStatus;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

Expand Down Expand Up @@ -56,28 +59,62 @@ public class DiskBalancerStartSubcommand extends AbstractDiskBalancerSubCommand
arity = "1")
private Boolean stopAfterDiskEven;

// Track nodes that are already running for display purposes (using Set to avoid duplicates)
private final Set<String> alreadyRunningNodes = new LinkedHashSet<>();

@Override
protected Object executeCommand(String hostName) throws IOException {
DiskBalancerProtocol diskBalancerProxy = DiskBalancerSubCommandUtil
.getSingleNodeDiskBalancerProxy(hostName);
try {
DiskBalancerConfigurationProto config = buildConfigProto();
diskBalancerProxy.startDiskBalancer(config);
// Check if DiskBalancer is already running before starting
DatanodeDiskBalancerInfoProto status = diskBalancerProxy.getDiskBalancerInfo();
String dnHostname = DiskBalancerSubCommandUtil.getDatanodeHostname(hostName);

Map<String, Object> result = new LinkedHashMap<>();
result.put("datanode", hostName);
result.put("action", "start");
result.put("status", "success");
Map<String, Object> configMap = getConfigurationMap();
if (configMap != null && !configMap.isEmpty()) {
result.put("configuration", configMap);
if (status.getRunningStatus() == DiskBalancerRunningStatus.RUNNING ||
status.getRunningStatus() == DiskBalancerRunningStatus.PAUSED) {
// Track this node as already running
alreadyRunningNodes.add(dnHostname);

// Return a skipped result
return createJsonResult(dnHostname, "skipped",
"DiskBalancer operation is already running.");
}
return result;

// Not running, proceed with start
DiskBalancerConfigurationProto config = buildConfigProto();
diskBalancerProxy.startDiskBalancer(config);

// Return a success result
return createJsonResult(dnHostname, "success", null);
} finally {
diskBalancerProxy.close();
}
}

/**
* Create a JSON result map for the start command.
*
* @param hostname the datanode hostname
* @param status the status ("success" or "skipped")
* @param message optional message (for skipped status)
* @return result map
*/
private Map<String, Object> createJsonResult(String hostname, String status, String message) {
Map<String, Object> result = new LinkedHashMap<>();
result.put("datanode", hostname);
result.put("action", "start");
result.put("status", status);
if (message != null) {
result.put("message", message);
}
Map<String, Object> configMap = getConfigurationMap();
if (configMap != null && !configMap.isEmpty()) {
result.put("configuration", configMap);
}
return result;
}

private DiskBalancerConfigurationProto buildConfigProto() {
DiskBalancerConfigurationProto.Builder builder =
DiskBalancerConfigurationProto.newBuilder();
Expand All @@ -97,29 +134,52 @@ private DiskBalancerConfigurationProto buildConfigProto() {
}

@Override
protected void displayResults(List<String> successNodes,
List<String> failedNodes) {
protected void displayResults(Set<String> successNodes,
Set<String> failedNodes) {
// In JSON mode, results are already written, only show summary if needed
if (getOptions().isJson()) {
return;
}

if (isBatchMode()) {
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to start DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
} else {
System.out.println("Started DiskBalancer on all IN_SERVICE nodes.");
}
// Filter out skipped nodes from successNodes
Set<String> actualSuccessNodes = new LinkedHashSet<>(successNodes);
actualSuccessNodes.removeAll(alreadyRunningNodes);

// Check if all nodes are already running (batch mode only)
boolean allNodesAlreadyRunning = isBatchMode() && actualSuccessNodes.isEmpty()
&& failedNodes.isEmpty() && !alreadyRunningNodes.isEmpty();

if (allNodesAlreadyRunning) {
System.out.println("DiskBalancer operation is already running on all IN_SERVICE and HEALTHY nodes.");
} else {
// Detailed message for specific nodes
if (!successNodes.isEmpty()) {
System.out.printf("Started DiskBalancer on nodes: [%s]%n",
String.join(", ", successNodes));
// Display warning for nodes that are already running (if not all)
if (!alreadyRunningNodes.isEmpty()) {
System.out.printf("DiskBalancer operation is already running on : [%s]%n",
String.join(", ", alreadyRunningNodes));
}
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to start DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));

if (isBatchMode()) {
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to start DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
}
if (!actualSuccessNodes.isEmpty()) {
// If no nodes were skipped, use simpler message
if (alreadyRunningNodes.isEmpty()) {
System.out.println("Started DiskBalancer on all IN_SERVICE nodes.");
} else {
System.out.println("Started DiskBalancer operation on all other IN_SERVICE and HEALTHY DNs.");
}
}
} else {
if (!actualSuccessNodes.isEmpty()) {
System.out.printf("Started DiskBalancer on nodes: [%s]%n",
String.join(", ", actualSuccessNodes));
}
if (!failedNodes.isEmpty()) {
System.err.printf("Failed to start DiskBalancer on nodes: [%s]%n",
String.join(", ", failedNodes));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.DiskBalancerProtocol;
Expand Down Expand Up @@ -64,7 +65,7 @@ protected Object executeCommand(String hostName) throws IOException {
}

@Override
protected void displayResults(List<String> successNodes, List<String> failedNodes) {
protected void displayResults(Set<String> successNodes, Set<String> failedNodes) {
// In JSON mode, results are already written
if (getOptions().isJson()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.hadoop.hdds.scm.cli.datanode;

import java.io.IOException;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.DiskBalancerProtocol;
import picocli.CommandLine.Command;
Expand All @@ -40,8 +41,12 @@ protected Object executeCommand(String hostName) throws IOException {
.getSingleNodeDiskBalancerProxy(hostName);
try {
diskBalancerProxy.stopDiskBalancer();
Map<String, Object> result = new java.util.LinkedHashMap<>();
result.put("datanode", hostName);

// Get hostname for consistent JSON output
String dnHostname = DiskBalancerSubCommandUtil.getDatanodeHostname(hostName);

Map<String, Object> result = new LinkedHashMap<>();
result.put("datanode", dnHostname);
result.put("action", "stop");
result.put("status", "success");
return result;
Expand All @@ -51,7 +56,7 @@ protected Object executeCommand(String hostName) throws IOException {
}

@Override
protected void displayResults(List<String> successNodes, List<String> failedNodes) {
protected void displayResults(Set<String> successNodes, Set<String> failedNodes) {
// In JSON mode, results are already written, only show summary if needed
if (getOptions().isJson()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port;
import org.apache.hadoop.hdds.protocol.DiskBalancerProtocol;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDiskBalancerInfoProto;
import org.apache.hadoop.hdds.protocolPB.DiskBalancerProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -119,5 +120,32 @@ public static List<String> getAllOperableNodesClientRpcAddress(

return addresses;
}

/**
* Gets the hostname of a datanode by querying its DiskBalancer info.
* This ensures consistent hostname display in output, even when connecting via IP address.
*
* @param address the datanode address (can be IP:port or hostname:port)
* @return the hostname of the datanode, or the original address if hostname cannot be retrieved
*/
public static String getDatanodeHostname(String address) {
DiskBalancerProtocol diskBalancerProxy = null;
try {
diskBalancerProxy = getSingleNodeDiskBalancerProxy(address);
DatanodeDiskBalancerInfoProto status = diskBalancerProxy.getDiskBalancerInfo();
return status.getNode().getHostName();
} catch (IOException e) {
// If we can't get the hostname, fall back to the original address
return address;
} finally {
if (diskBalancerProxy != null) {
try {
diskBalancerProxy.close();
} catch (IOException e) {
// Ignore close errors
}
}
}
}
}

Loading