-
Notifications
You must be signed in to change notification settings - Fork 157
Description
Discussions about Geaflow scheduling have been ongoing for some time. I have redesigned a solution, and I welcome discussions from the community.
Through discussion(see issue-607), we have identified the following background:
A graph is divided into several shards. When a driver starts and schedules a job for the first time, it determines which shards are assigned to which corresponding workers. These workers pull shards, thus caching the corresponding shards on their machines. The cycle scheduling engine, during its work, knows the shard distribution information. A process may start many workers, but these workers may not share shards.
The current problem is that each job execution has a fixed degree of parallelism. Each degree of parallelism corresponds to a task and is scheduled to a worker for processing. This task therefore handles a fixed number of shards. If the underlying process centrally manages the shards, a worker can read any shard, and the upper-level task determines the allocated degree of parallelism based on the amount of data it needs to read, allowing a task to handle a portion of the shards, this would be feasible. This would enable dynamic calculation of parallelism and scheduling of execution.
Based on the above considerations, the following solution is proposed:
1. Background and Objectives
1.1 Current State Analysis
GeaFlow currently adopts a shard manager scheduling model with the following core characteristics:
┌─────────────────────────────────────────────────────────────────┐
│ Current Architecture: Static Task-Shard Binding │
├─────────────────────────────────────────────────────────────────┤
│ Container-0 Container-1 │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Worker-0 [Task-0] │ │ Worker-2 [Task-2] │ │
│ │ ├─ KeyGroup[0,31] │ │ ├─ KeyGroup[64,95] │ │
│ │ └─ Shard-0 (Local) │ │ └─ Shard-2 (Local) │ │
│ ├─────────────────────┤ ├─────────────────────┤ │
│ │ Worker-1 [Task-1] │ │ Worker-3 [Task-3] │ │
│ │ ├─ KeyGroup[32,63] │ │ ├─ KeyGroup[96,127] │ │
│ │ └─ Shard-1 (Local) │ │ └─ Shard-3 (Local) │ │
│ └─────────────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key Constraints:
| Dimension | Current State | Problem |
|---|---|---|
| Task-Shard Binding | Static binding via KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex |
Tasks can only access data within fixed KeyGroup ranges |
| State Storage Location | Local RocksDB with path containing shardId | Cannot access across processes/workers |
| Worker Role | Both execution container and data owner | Limited scheduling flexibility |
| Failure Recovery | Must recover specific shards on specific workers | Recovery time coupled with data locality |
Existing SERVICE_SHARE_ENABLE Mechanism (Source: AbstractGraphVertexCentricOp.java):
// Current implementation: Container-level sharing
if (shareEnable) {
int taskIndex = processIndex; // Use Container index
int taskPara = containerNum; // Use Container count
desc.withSingleton(); // Singleton GraphState
desc.withStateMode(StateMode.RDONLY); // Read-only mode
}This approach only supports workers within the same Container sharing GraphState, and cannot achieve cross-Container shard sharing.
1.2 Target Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Target Architecture: Shared Graph Shard │
├─────────────────────────────────────────────────────────────────┤
│ Shard Registry Service │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Shard-0 → Container-0:port | Shard-1 → Container-0:port ││
│ │ Shard-2 → Container-1:port | Shard-3 → Container-1:port ││
│ └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│ Container-0 (Shard Host) Container-1 (Shard Host) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ ShardServer │ │ ShardServer │ │
│ │ ├─ Shard-0 (Local) │ │ ├─ Shard-2 (Local) │ │
│ │ └─ Shard-1 (Local) │ │ └─ Shard-3 (Local) │ │
│ └─────────────────────┘ └─────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ Worker-0 Worker-1 Worker-2 │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Task-A │ │ Task-B │ │ Task-C │ │
│ │ Read: 0,2 │ │ Read: 1,3 │ │ Read: 0,1 │ │
│ │ (Remote) │ │ (Remote) │ │ (Remote) │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Core Objectives:
- Decouple Task from Shard: Tasks can dynamically choose to read any shard
- Pure Execution Workers: Workers hold no data, only responsible for computation
- Shared Shard Access: All workers within a process share Graph Shard read capability
- Elastic Scheduling: Support dynamic task assignment without considering data locality
2. Technical Design
2.1 Overall Architecture
2.2 Core Component Design
2.2.1 ShardRegistryService
Responsibility: Manage metadata and location information for all shards
// New file: geaflow/geaflow-core/geaflow-engine/geaflow-cluster/
// src/main/java/org/apache/geaflow/cluster/shard/ShardRegistryService.java
public interface ShardRegistryService {
/**
* Register shard location
* @param graphName Graph name
* @param shardId Shard ID
* @param location Shard host information
*/
void registerShard(String graphName, int shardId, ShardLocation location);
/**
* Get shard location
* @param graphName Graph name
* @param shardId Shard ID
* @return Shard location information
*/
ShardLocation getShardLocation(String graphName, int shardId);
/**
* Get all shard locations for a graph
* @param graphName Graph name
* @return shardId → location mapping
*/
Map<Integer, ShardLocation> getAllShardLocations(String graphName);
/**
* Subscribe to shard location changes
* @param graphName Graph name
* @param listener Listener
*/
void subscribe(String graphName, ShardLocationChangeListener listener);
/**
* Get latest version number
*/
long getLatestVersion(String graphName);
}
public class ShardLocation implements Serializable {
private String hostId; // ShardHost identifier
private String host; // Host address
private int port; // RPC port
private int shardId; // Shard ID
private long version; // Data version
private ShardStatus status; // Status: LOADING/READY/RECOVERING
}
public enum ShardStatus {
LOADING, // Loading
READY, // Ready for read
RECOVERING, // Recovering
OFFLINE // Offline
}Storage Backend Options:
| Option | Pros | Cons | Use Case |
|---|---|---|---|
| Redis | Low latency, supports Pub/Sub | Requires additional deployment | Production |
| ZooKeeper | Strong consistency, supports Watch | Limited write performance | High reliability requirements |
| HAService | Reuses existing components | Limited functionality | Quick validation |
Recommended: Extend existing HAService, reusing Redis/HBase backend.
2.2.2 ShardServer
Responsibility: Host GraphState on ShardHost and provide remote query service
// New file: geaflow/geaflow-state/geaflow-state-impl/
// src/main/java/org/apache/geaflow/state/server/ShardServer.java
public class ShardServer implements Closeable {
private final Map<String, Map<Integer, GraphState>> graphStates;
private final RpcService rpcService;
private final ShardRegistryService registry;
private final Configuration config;
/**
* Initialize and load specified shard
*/
public void loadShard(String graphName, int shardId, long version) {
GraphStateDescriptor desc = buildDescriptor(graphName, shardId);
desc.withStateMode(StateMode.RDONLY);
GraphState state = StateFactory.buildGraphState(desc, config);
state.manage().operate().setCheckpointId(version);
state.manage().operate().recover();
graphStates.computeIfAbsent(graphName, k -> new ConcurrentHashMap<>())
.put(shardId, state);
// Register to Registry
registry.registerShard(graphName, shardId, buildLocation(shardId));
}
/**
* Query vertex
*/
public IVertex<K, VV> queryVertex(String graphName, int shardId, K vertexId) {
return getState(graphName, shardId).staticGraph().V().query(vertexId).get();
}
/**
* Query edges
*/
public List<IEdge<K, EV>> queryEdges(String graphName, int shardId, K srcId) {
return getState(graphName, shardId).staticGraph().E().query(srcId).asList();
}
/**
* Batch query one-degree graphs
*/
public Map<K, OneDegreeGraph<K, VV, EV>> queryOneDegreeGraphs(
String graphName, int shardId, List<K> vertexIds) {
// Batch query optimization to reduce RPC calls
}
/**
* Query with push-down conditions
*/
public CloseableIterator<IVertex<K, VV>> queryVerticesWithPushDown(
String graphName, int shardId, IStatePushDown pushDown) {
return getState(graphName, shardId).staticGraph().V()
.query(pushDown.getKeyGroup())
.by(pushDown.getFilter())
.iterator();
}
}RPC Protocol Design:
// New file: geaflow/geaflow-state/geaflow-state-api/
// src/main/proto/shard_service.proto
syntax = "proto3";
service ShardService {
// Single vertex query
rpc QueryVertex(QueryVertexRequest) returns (QueryVertexResponse);
// Single source edge query
rpc QueryEdges(QueryEdgesRequest) returns (QueryEdgesResponse);
// Batch one-degree graph query
rpc QueryOneDegreeGraphBatch(QueryOneDegreeGraphBatchRequest)
returns (QueryOneDegreeGraphBatchResponse);
// Conditional iteration query (streaming response)
rpc QueryWithPushDown(QueryWithPushDownRequest)
returns (stream QueryResultChunk);
}
message QueryVertexRequest {
string graph_name = 1;
int32 shard_id = 2;
bytes vertex_id = 3; // Serialized vertex ID
}
message QueryOneDegreeGraphBatchRequest {
string graph_name = 1;
int32 shard_id = 2;
repeated bytes vertex_ids = 3;
PushDownCondition condition = 4;
}
message PushDownCondition {
int32 start_key_group = 1;
int32 end_key_group = 2;
bytes filter_bytes = 3; // Serialized IGraphFilter
int64 limit = 4;
}2.2.3 SharedShardClient
Responsibility: Provide transparent shard access capability for tasks
// New file: geaflow/geaflow-state/geaflow-state-impl/
// src/main/java/org/apache/geaflow/state/client/SharedShardClient.java
public class SharedShardClient<K, VV, EV> implements StaticGraphTrait<K, VV, EV> {
private final String graphName;
private final ShardRegistryService registry;
private final IKeyGroupAssigner keyGroupAssigner;
private final Map<ShardLocation, RpcClient> clientPool;
private final Configuration config;
// Local cache (optional)
private final Cache<CacheKey, Object> localCache;
/**
* Query vertex (auto-routing)
*/
@Override
public IVertex<K, VV> getVertex(K vertexId) {
int shardId = keyGroupAssigner.assign(vertexId);
ShardLocation location = registry.getShardLocation(graphName, shardId);
// Check local cache
CacheKey cacheKey = new CacheKey(shardId, vertexId);
IVertex<K, VV> cached = localCache.getIfPresent(cacheKey);
if (cached != null) {
return cached;
}
// Remote query
RpcClient client = getOrCreateClient(location);
IVertex<K, VV> vertex = client.queryVertex(graphName, shardId, vertexId);
localCache.put(cacheKey, vertex);
return vertex;
}
/**
* Batch query one-degree graphs (grouped by shard for optimization)
*/
public Map<K, OneDegreeGraph<K, VV, EV>> getOneDegreeGraphs(List<K> vertexIds) {
// Group by shard
Map<Integer, List<K>> shardToVertices = vertexIds.stream()
.collect(Collectors.groupingBy(keyGroupAssigner::assign));
// Parallel query for each shard
Map<K, OneDegreeGraph<K, VV, EV>> result = new ConcurrentHashMap<>();
shardToVertices.entrySet().parallelStream().forEach(entry -> {
int shardId = entry.getKey();
List<K> ids = entry.getValue();
ShardLocation location = registry.getShardLocation(graphName, shardId);
RpcClient client = getOrCreateClient(location);
Map<K, OneDegreeGraph<K, VV, EV>> partial =
client.queryOneDegreeGraphBatch(graphName, shardId, ids);
result.putAll(partial);
});
return result;
}
/**
* Full graph scan (parallel across shards)
*/
public CloseableIterator<IVertex<K, VV>> scanAllVertices(IStatePushDown pushDown) {
Map<Integer, ShardLocation> allLocations = registry.getAllShardLocations(graphName);
// Create parallel iterators
List<CloseableIterator<IVertex<K, VV>>> iterators = allLocations.entrySet()
.stream()
.map(e -> createRemoteIterator(e.getKey(), e.getValue(), pushDown))
.collect(Collectors.toList());
return new MergedIterator<>(iterators);
}
}2.2.4 SharedGraphState (Facade)
Responsibility: Adapt to existing GraphState interface, transparent to upper layers
// New file: geaflow/geaflow-state/geaflow-state-impl/
// src/main/java/org/apache/geaflow/state/SharedGraphStateImpl.java
public class SharedGraphStateImpl<K, VV, EV> implements GraphState<K, VV, EV> {
private final SharedShardClient<K, VV, EV> client;
private final ManageableGraphState manageableState;
public SharedGraphStateImpl(SharedShardClient<K, VV, EV> client) {
this.client = client;
this.manageableState = new SharedManageableGraphState(client);
}
@Override
public StaticGraphState<K, VV, EV> staticGraph() {
return new SharedStaticGraphState<>(client);
}
@Override
public DynamicGraphState<K, VV, EV> dynamicGraph() {
return new SharedDynamicGraphState<>(client);
}
@Override
public ManageableGraphState manage() {
return manageableState;
}
}
// Shared static graph state implementation
class SharedStaticGraphState<K, VV, EV> implements StaticGraphState<K, VV, EV> {
private final SharedShardClient<K, VV, EV> client;
@Override
public VertexQuery<K, VV> V() {
return new SharedVertexQuery<>(client);
}
@Override
public EdgeQuery<K, EV> E() {
return new SharedEdgeQuery<>(client);
}
}2.3 Scheduling Layer Modifications
2.3.1 TaskAssigner Modifications
// Modified file: geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/
// src/main/java/org/apache/geaflow/runtime/core/scheduler/resource/TaskAssigner.java
public class TaskAssigner implements Serializable {
// New: shared mode flag
private boolean sharedShardMode;
/**
* Task assignment in shared mode
* - No longer considers data locality
* - Pure load balancing assignment
*/
public Map<Integer, WorkerInfo> assignTasksInSharedMode(
List<Integer> tasks, List<WorkerInfo> workers) {
Map<Integer, WorkerInfo> matches = new HashMap<>();
int workerCount = workers.size();
// Round-robin assignment, pure load balancing
for (int i = 0; i < tasks.size(); i++) {
matches.put(tasks.get(i), workers.get(i % workerCount));
}
return matches;
}
/**
* Check if worker is available (no longer checks shard location)
*/
public boolean isWorkerAvailable(WorkerInfo worker) {
if (sharedShardMode) {
return worker.getStatus() == WorkerStatus.READY;
}
// Original logic...
}
}2.3.2 New ShardHostManager
// New file: geaflow/geaflow-core/geaflow-engine/geaflow-cluster/
// src/main/java/org/apache/geaflow/cluster/shard/ShardHostManager.java
public class ShardHostManager {
private final ShardRegistryService registry;
private final Map<String, ShardServer> shardServers;
/**
* Initialize ShardHost
* - Load all shards assigned to this host
* - Register to Registry
*/
public void initialize(String graphName, List<Integer> assignedShards, long version) {
ShardServer server = new ShardServer(config);
for (int shardId : assignedShards) {
server.loadShard(graphName, shardId, version);
}
shardServers.put(graphName, server);
}
/**
* Shard reassignment (for scaling)
*/
public void reassignShards(String graphName,
List<Integer> toLoad, List<Integer> toUnload, long version) {
ShardServer server = shardServers.get(graphName);
// Unload shards no longer needed
for (int shardId : toUnload) {
server.unloadShard(shardId);
}
// Load newly assigned shards
for (int shardId : toLoad) {
server.loadShard(graphName, shardId, version);
}
}
}2.4 Configuration Parameters
// Modified file: geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/config/keys/FrameworkConfigKeys.java
public class FrameworkConfigKeys {
// Existing configuration
public static final ConfigKey SERVICE_SHARE_ENABLE = ConfigKeys
.key("geaflow.analytics.service.share.enable")
.defaultValue(false)
.description("Enable container-level state sharing");
// ========== New Configurations ==========
/**
* Enable cross-process shared shard mode
*/
public static final ConfigKey SHARED_SHARD_ENABLE = ConfigKeys
.key("geaflow.state.shared.shard.enable")
.defaultValue(false)
.description("Enable cross-process shared shard mode");
/**
* ShardServer RPC port
*/
public static final ConfigKey SHARD_SERVER_PORT = ConfigKeys
.key("geaflow.state.shard.server.port")
.defaultValue(9527)
.description("ShardServer RPC port");
/**
* Shard registry service type
*/
public static final ConfigKey SHARD_REGISTRY_TYPE = ConfigKeys
.key("geaflow.state.shard.registry.type")
.defaultValue("redis")
.description("Shard registry service type: redis/zookeeper/haservice");
/**
* Client local cache size
*/
public static final ConfigKey SHARD_CLIENT_CACHE_SIZE = ConfigKeys
.key("geaflow.state.shard.client.cache.size")
.defaultValue(10000)
.description("SharedShardClient local cache size");
/**
* Client cache expiration time (seconds)
*/
public static final ConfigKey SHARD_CLIENT_CACHE_EXPIRE_SECONDS = ConfigKeys
.key("geaflow.state.shard.client.cache.expire.seconds")
.defaultValue(60)
.description("SharedShardClient local cache expire time in seconds");
/**
* RPC call timeout (milliseconds)
*/
public static final ConfigKey SHARD_RPC_TIMEOUT_MS = ConfigKeys
.key("geaflow.state.shard.rpc.timeout.ms")
.defaultValue(30000)
.description("ShardServer RPC call timeout in milliseconds");
/**
* Maximum vertices per batch query
*/
public static final ConfigKey SHARD_BATCH_QUERY_SIZE = ConfigKeys
.key("geaflow.state.shard.batch.query.size")
.defaultValue(1000)
.description("Maximum vertices per batch query");
}3. Data Flow and Interaction Sequences
3.1 Startup Flow
sequenceDiagram
participant Master
participant Registry as ShardRegistryService
participant SH1 as ShardHost-1
participant SH2 as ShardHost-2
participant W1 as Worker-1
participant W2 as Worker-2
Master->>Master: Calculate shard assignment strategy
Master->>SH1: AssignShards([0,1], version=5)
Master->>SH2: AssignShards([2,3], version=5)
par ShardHost Initialization
SH1->>SH1: loadShard(0, version=5)
SH1->>SH1: loadShard(1, version=5)
SH1->>Registry: registerShard(graph, 0, location1)
SH1->>Registry: registerShard(graph, 1, location1)
and
SH2->>SH2: loadShard(2, version=5)
SH2->>SH2: loadShard(3, version=5)
SH2->>Registry: registerShard(graph, 2, location2)
SH2->>Registry: registerShard(graph, 3, location2)
end
Master->>W1: CreateTask(taskId=A)
Master->>W2: CreateTask(taskId=B)
W1->>Registry: subscribe(graph)
W2->>Registry: subscribe(graph)
Registry-->>W1: ShardLocations([0→SH1, 1→SH1, 2→SH2, 3→SH2])
Registry-->>W2: ShardLocations([0→SH1, 1→SH1, 2→SH2, 3→SH2])
3.2 Query Flow
sequenceDiagram
participant Task as Task-A
participant Client as SharedShardClient
participant Registry as ShardRegistryService
participant SH1 as ShardHost-1
participant SH2 as ShardHost-2
Task->>Client: getVertex(vertexId="v1")
Client->>Client: shardId = keyGroupAssigner.assign("v1") = 2
Client->>Registry: getShardLocation(graph, shardId=2)
Registry-->>Client: location = ShardHost-2
Client->>SH2: QueryVertex(graph, shard=2, "v1")
SH2->>SH2: graphState.staticGraph().V().query("v1")
SH2-->>Client: Vertex("v1", properties)
Client->>Client: localCache.put(key, vertex)
Client-->>Task: Vertex("v1", properties)
3.3 Batch Query Optimization
sequenceDiagram
participant Task as Task-A
participant Client as SharedShardClient
participant SH1 as ShardHost-1
participant SH2 as ShardHost-2
Task->>Client: getOneDegreeGraphs([v1, v2, v3, v4, v5])
Client->>Client: Group by shard
Note over Client: shard-1: [v1, v3]<br/>shard-2: [v2, v4, v5]
par Parallel Query
Client->>SH1: QueryOneDegreeGraphBatch(shard=1, [v1, v3])
SH1-->>Client: {v1: ODG1, v3: ODG3}
and
Client->>SH2: QueryOneDegreeGraphBatch(shard=2, [v2, v4, v5])
SH2-->>Client: {v2: ODG2, v4: ODG4, v5: ODG5}
end
Client->>Client: merge results
Client-->>Task: {v1: ODG1, v2: ODG2, v3: ODG3, v4: ODG4, v5: ODG5}
4. Key Technical Challenges and Solutions
4.1 Data Consistency
Challenge: Consistent reading of multi-version data
Solution: Versioned reading + ViewMetaBookKeeper
public class VersionedShardAccess {
/**
* Ensure reading data of specified version
*/
public IVertex<K, VV> readWithVersion(K vertexId, long expectedVersion) {
int shardId = keyGroupAssigner.assign(vertexId);
ShardLocation location = registry.getShardLocation(graphName, shardId);
// Check version
if (location.getVersion() < expectedVersion) {
// Wait for shard to update to specified version
waitForVersion(shardId, expectedVersion, timeoutMs);
}
return client.queryVertex(graphName, shardId, vertexId);
}
/**
* Snapshot isolation read
*/
public SnapshotReader<K, VV, EV> createSnapshotReader(long snapshotVersion) {
// Lock current version to ensure consistent reads
return new SnapshotReader<>(this, snapshotVersion);
}
}4.2 Network Overhead Optimization
Challenge: Network latency for remote access
Solution: Multi-level caching + Batch queries + Prefetching
public class OptimizedShardClient<K, VV, EV> extends SharedShardClient<K, VV, EV> {
// L1: Hot data cache
private final Cache<CacheKey, Object> hotCache;
// L2: Prefetch cache
private final Map<Integer, PreFetchBuffer> preFetchBuffers;
/**
* Query with prefetching
*/
public IVertex<K, VV> getVertexWithPrefetch(K vertexId, List<K> neighborHints) {
IVertex<K, VV> vertex = getVertex(vertexId);
// Async prefetch potential neighbor vertices
if (neighborHints != null && !neighborHints.isEmpty()) {
CompletableFuture.runAsync(() -> prefetchVertices(neighborHints));
}
return vertex;
}
/**
* Batch request merging
*/
private final BatchRequestMerger batchMerger = new BatchRequestMerger(
maxBatchSize,
maxWaitMs,
this::executeBatchQuery
);
public CompletableFuture<IVertex<K, VV>> getVertexAsync(K vertexId) {
return batchMerger.submit(vertexId);
}
}4.3 Failure Recovery
Challenge: Shard migration when ShardHost fails
Solution: Fast failure detection + Shard reassignment
public class ShardFailoverHandler {
private final ShardRegistryService registry;
private final List<ShardHostInfo> availableHosts;
/**
* Handle ShardHost failure
*/
public void handleShardHostFailure(String failedHostId) {
// 1. Get all shards on failed host
List<Integer> affectedShards = registry.getShardsOnHost(failedHostId);
// 2. Select new host
Map<Integer, String> reassignment = computeReassignment(affectedShards);
// 3. Load shards on new host
for (Map.Entry<Integer, String> entry : reassignment.entrySet()) {
int shardId = entry.getKey();
String newHostId = entry.getValue();
ShardHost newHost = getHost(newHostId);
long latestVersion = registry.getLatestVersion(graphName);
newHost.loadShard(graphName, shardId, latestVersion);
}
// 4. Update Registry
for (Map.Entry<Integer, String> entry : reassignment.entrySet()) {
registry.updateShardLocation(graphName, entry.getKey(),
getLocation(entry.getValue()));
}
// 5. Notify all clients to refresh cache
registry.broadcastLocationChange(graphName);
}
}4.4 Load Balancing
Challenge: Shard access hotspots
Solution: Dynamic replicas + Request routing
public class LoadBalancedShardRouter {
private final Map<Integer, List<ShardLocation>> shardReplicas;
private final LoadMetricsCollector metricsCollector;
/**
* Select best shard replica
*/
public ShardLocation selectBestReplica(int shardId) {
List<ShardLocation> replicas = shardReplicas.get(shardId);
if (replicas.size() == 1) {
return replicas.get(0);
}
// Select based on load
return replicas.stream()
.min(Comparator.comparingDouble(loc ->
metricsCollector.getLoad(loc.getHostId())))
.orElse(replicas.get(0));
}
/**
* Detect hot shards and create replicas
*/
public void handleHotShard(int shardId) {
double qps = metricsCollector.getShardQPS(shardId);
if (qps > hotShardThreshold) {
// Select least loaded host to create replica
String targetHost = selectLeastLoadedHost();
createShardReplica(shardId, targetHost);
}
}
}