-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Is your feature request related to a problem? Please describe.
CLI bin/pulsar-admin supports set-clusters and get-clusters command so that we can set / get replication clusters for a namespace. But it lacks corresponding remove-clusters command to restore to the unset state, I think it is necessary to add this command to ensure the closed-loop operation of the replication cluster.
Describe the solution you'd like
In Client Side
Add a inner class RemoveReplicationClusters in CmdNamespaces.java as below:
private class RemoveReplicationClusters extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
getAdmin().namespaces().removeNamespaceReplicationClusters(namespace);
}
}
Then add two methods removeNamespaceReplicationClusters and removeNamespaceReplicationClustersAsync in Namespaces.java as below:
/**
* Remove the replication clusters for a namespace.
*
* @param namespace
* @throws PulsarAdminException
*/
void removeNamespaceReplicationClusters(String namespace) throws PulsarAdminException;
/**
* Remove the replication clusters for a namespace asynchronously.
*
* @param namespace
* @return
*/
CompletableFuture<Void> removeNamespaceReplicationClustersAsync(String namespace);
and implement them by calling an asynchronous delete request in NamespacesImpl.java as below:
@Override
public void removeNamespaceReplicationClusters(String namespace) throws PulsarAdminException {
sync(() -> removeNamespaceReplicationClustersAsync(namespace));
}
@Override
public CompletableFuture<Void> removeNamespaceReplicationClustersAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "replication");
return asyncDeleteRequest(path);
}
In Server Side
Add method removeNamespaceReplicationClusters in v1/Namespaces.java and v2/Namespaces.java as below:
// for v1
@DELETE
@Path("/{property}/{cluster}/{namespace}/replication")
@ApiOperation(value = "Remove the replication clusters for namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
public void removeNamespaceReplicationClusters(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
internalSetNamespaceReplicationClusters(Lists.newArrayList());
}
// for v2
@DELETE
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Remove the replication clusters for namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
public void removeNamespaceReplicationClusters(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceReplicationClusters(Lists.newArrayList());
}
and they can reuse the internalSetNamespaceReplicationClusters method directly in NamespacesBase.java, just pass a empty List. The important thing is replicationClusterSet in method internalSetNamespaceReplicationClusters should be consistent with the initial value When creating replication clusters, its value is empty Set in v1/namespace as below:
pulsar/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
Line 38 in 9994614
| public Set<String> replication_clusters = new HashSet<>(); |
and its value is
local cluster in v2/namespace as below:pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
Lines 2025 to 2029 in 9994614
| private void validatePolicies(NamespaceName ns, Policies policies) { | |
| if (ns.isV2() && policies.replication_clusters.isEmpty()) { | |
| // Default to local cluster | |
| policies.replication_clusters = Collections.singleton(config().getClusterName()); | |
| } |
So we modify the value of
replicationClusterSet in method internalSetNamespaceReplicationClusters but this will not affect this method as below:
Set<String> replicationClusterSet = clusterIds.isEmpty()
? namespaceName.isV2()
? Sets.newHashSet(config().getClusterName()) : Sets.newHashSet()
: Sets.newHashSet(clusterIds);