Skip to content

Shared Graph Shard Scheduling Architecture Design #747

@kitalkuyo-gita

Description

@kitalkuyo-gita

Discussions about Geaflow scheduling have been ongoing for some time. I have redesigned a solution, and I welcome discussions from the community.

Through discussion(see issue-607), we have identified the following background:

A graph is divided into several shards. When a driver starts and schedules a job for the first time, it determines which shards are assigned to which corresponding workers. These workers pull shards, thus caching the corresponding shards on their machines. The cycle scheduling engine, during its work, knows the shard distribution information. A process may start many workers, but these workers may not share shards.

The current problem is that each job execution has a fixed degree of parallelism. Each degree of parallelism corresponds to a task and is scheduled to a worker for processing. This task therefore handles a fixed number of shards. If the underlying process centrally manages the shards, a worker can read any shard, and the upper-level task determines the allocated degree of parallelism based on the amount of data it needs to read, allowing a task to handle a portion of the shards, this would be feasible. This would enable dynamic calculation of parallelism and scheduling of execution.

Based on the above considerations, the following solution is proposed:

1. Background and Objectives

1.1 Current State Analysis

GeaFlow currently adopts a shard manager scheduling model with the following core characteristics:

┌─────────────────────────────────────────────────────────────────┐
│              Current Architecture: Static Task-Shard Binding     │
├─────────────────────────────────────────────────────────────────┤
│  Container-0                    Container-1                      │
│  ┌─────────────────────┐       ┌─────────────────────┐          │
│  │ Worker-0 [Task-0]   │       │ Worker-2 [Task-2]   │          │
│  │ ├─ KeyGroup[0,31]   │       │ ├─ KeyGroup[64,95]  │          │
│  │ └─ Shard-0 (Local)  │       │ └─ Shard-2 (Local)  │          │
│  ├─────────────────────┤       ├─────────────────────┤          │
│  │ Worker-1 [Task-1]   │       │ Worker-3 [Task-3]   │          │
│  │ ├─ KeyGroup[32,63]  │       │ ├─ KeyGroup[96,127] │          │
│  │ └─ Shard-1 (Local)  │       │ └─ Shard-3 (Local)  │          │
│  └─────────────────────┘       └─────────────────────┘          │
└─────────────────────────────────────────────────────────────────┘

Key Constraints:

Dimension Current State Problem
Task-Shard Binding Static binding via KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex Tasks can only access data within fixed KeyGroup ranges
State Storage Location Local RocksDB with path containing shardId Cannot access across processes/workers
Worker Role Both execution container and data owner Limited scheduling flexibility
Failure Recovery Must recover specific shards on specific workers Recovery time coupled with data locality

Existing SERVICE_SHARE_ENABLE Mechanism (Source: AbstractGraphVertexCentricOp.java):

// Current implementation: Container-level sharing
if (shareEnable) {
    int taskIndex = processIndex;  // Use Container index
    int taskPara = containerNum;   // Use Container count
    desc.withSingleton();          // Singleton GraphState
    desc.withStateMode(StateMode.RDONLY);  // Read-only mode
}

This approach only supports workers within the same Container sharing GraphState, and cannot achieve cross-Container shard sharing.

1.2 Target Architecture

┌─────────────────────────────────────────────────────────────────┐
│                Target Architecture: Shared Graph Shard           │
├─────────────────────────────────────────────────────────────────┤
│                    Shard Registry Service                        │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │  Shard-0 → Container-0:port | Shard-1 → Container-0:port    ││
│  │  Shard-2 → Container-1:port | Shard-3 → Container-1:port    ││
│  └─────────────────────────────────────────────────────────────┘│
├─────────────────────────────────────────────────────────────────┤
│  Container-0 (Shard Host)       Container-1 (Shard Host)         │
│  ┌─────────────────────┐       ┌─────────────────────┐          │
│  │ ShardServer         │       │ ShardServer         │          │
│  │ ├─ Shard-0 (Local)  │       │ ├─ Shard-2 (Local)  │          │
│  │ └─ Shard-1 (Local)  │       │ └─ Shard-3 (Local)  │          │
│  └─────────────────────┘       └─────────────────────┘          │
├─────────────────────────────────────────────────────────────────┤
│  Worker-0              Worker-1              Worker-2            │
│  ┌────────────┐       ┌────────────┐       ┌────────────┐       │
│  │ Task-A     │       │ Task-B     │       │ Task-C     │       │
│  │ Read: 0,2  │       │ Read: 1,3  │       │ Read: 0,1  │       │
│  │ (Remote)   │       │ (Remote)   │       │ (Remote)   │       │
│  └────────────┘       └────────────┘       └────────────┘       │
└─────────────────────────────────────────────────────────────────┘

Core Objectives:

  1. Decouple Task from Shard: Tasks can dynamically choose to read any shard
  2. Pure Execution Workers: Workers hold no data, only responsible for computation
  3. Shared Shard Access: All workers within a process share Graph Shard read capability
  4. Elastic Scheduling: Support dynamic task assignment without considering data locality

2. Technical Design

2.1 Overall Architecture

Image

2.2 Core Component Design

2.2.1 ShardRegistryService

Responsibility: Manage metadata and location information for all shards

// New file: geaflow/geaflow-core/geaflow-engine/geaflow-cluster/
//          src/main/java/org/apache/geaflow/cluster/shard/ShardRegistryService.java

public interface ShardRegistryService {
    
    /**
     * Register shard location
     * @param graphName Graph name
     * @param shardId Shard ID
     * @param location Shard host information
     */
    void registerShard(String graphName, int shardId, ShardLocation location);
    
    /**
     * Get shard location
     * @param graphName Graph name
     * @param shardId Shard ID
     * @return Shard location information
     */
    ShardLocation getShardLocation(String graphName, int shardId);
    
    /**
     * Get all shard locations for a graph
     * @param graphName Graph name
     * @return shardId → location mapping
     */
    Map<Integer, ShardLocation> getAllShardLocations(String graphName);
    
    /**
     * Subscribe to shard location changes
     * @param graphName Graph name
     * @param listener Listener
     */
    void subscribe(String graphName, ShardLocationChangeListener listener);
    
    /**
     * Get latest version number
     */
    long getLatestVersion(String graphName);
}

public class ShardLocation implements Serializable {
    private String hostId;      // ShardHost identifier
    private String host;        // Host address
    private int port;           // RPC port
    private int shardId;        // Shard ID
    private long version;       // Data version
    private ShardStatus status; // Status: LOADING/READY/RECOVERING
}

public enum ShardStatus {
    LOADING,    // Loading
    READY,      // Ready for read
    RECOVERING, // Recovering
    OFFLINE     // Offline
}

Storage Backend Options:

Option Pros Cons Use Case
Redis Low latency, supports Pub/Sub Requires additional deployment Production
ZooKeeper Strong consistency, supports Watch Limited write performance High reliability requirements
HAService Reuses existing components Limited functionality Quick validation

Recommended: Extend existing HAService, reusing Redis/HBase backend.

2.2.2 ShardServer

Responsibility: Host GraphState on ShardHost and provide remote query service

// New file: geaflow/geaflow-state/geaflow-state-impl/
//          src/main/java/org/apache/geaflow/state/server/ShardServer.java

public class ShardServer implements Closeable {
    
    private final Map<String, Map<Integer, GraphState>> graphStates;
    private final RpcService rpcService;
    private final ShardRegistryService registry;
    private final Configuration config;
    
    /**
     * Initialize and load specified shard
     */
    public void loadShard(String graphName, int shardId, long version) {
        GraphStateDescriptor desc = buildDescriptor(graphName, shardId);
        desc.withStateMode(StateMode.RDONLY);
        
        GraphState state = StateFactory.buildGraphState(desc, config);
        state.manage().operate().setCheckpointId(version);
        state.manage().operate().recover();
        
        graphStates.computeIfAbsent(graphName, k -> new ConcurrentHashMap<>())
                   .put(shardId, state);
        
        // Register to Registry
        registry.registerShard(graphName, shardId, buildLocation(shardId));
    }
    
    /**
     * Query vertex
     */
    public IVertex<K, VV> queryVertex(String graphName, int shardId, K vertexId) {
        return getState(graphName, shardId).staticGraph().V().query(vertexId).get();
    }
    
    /**
     * Query edges
     */
    public List<IEdge<K, EV>> queryEdges(String graphName, int shardId, K srcId) {
        return getState(graphName, shardId).staticGraph().E().query(srcId).asList();
    }
    
    /**
     * Batch query one-degree graphs
     */
    public Map<K, OneDegreeGraph<K, VV, EV>> queryOneDegreeGraphs(
            String graphName, int shardId, List<K> vertexIds) {
        // Batch query optimization to reduce RPC calls
    }
    
    /**
     * Query with push-down conditions
     */
    public CloseableIterator<IVertex<K, VV>> queryVerticesWithPushDown(
            String graphName, int shardId, IStatePushDown pushDown) {
        return getState(graphName, shardId).staticGraph().V()
            .query(pushDown.getKeyGroup())
            .by(pushDown.getFilter())
            .iterator();
    }
}

RPC Protocol Design:

// New file: geaflow/geaflow-state/geaflow-state-api/
//          src/main/proto/shard_service.proto

syntax = "proto3";

service ShardService {
    // Single vertex query
    rpc QueryVertex(QueryVertexRequest) returns (QueryVertexResponse);
    
    // Single source edge query
    rpc QueryEdges(QueryEdgesRequest) returns (QueryEdgesResponse);
    
    // Batch one-degree graph query
    rpc QueryOneDegreeGraphBatch(QueryOneDegreeGraphBatchRequest) 
        returns (QueryOneDegreeGraphBatchResponse);
    
    // Conditional iteration query (streaming response)
    rpc QueryWithPushDown(QueryWithPushDownRequest) 
        returns (stream QueryResultChunk);
}

message QueryVertexRequest {
    string graph_name = 1;
    int32 shard_id = 2;
    bytes vertex_id = 3;  // Serialized vertex ID
}

message QueryOneDegreeGraphBatchRequest {
    string graph_name = 1;
    int32 shard_id = 2;
    repeated bytes vertex_ids = 3;
    PushDownCondition condition = 4;
}

message PushDownCondition {
    int32 start_key_group = 1;
    int32 end_key_group = 2;
    bytes filter_bytes = 3;  // Serialized IGraphFilter
    int64 limit = 4;
}

2.2.3 SharedShardClient

Responsibility: Provide transparent shard access capability for tasks

// New file: geaflow/geaflow-state/geaflow-state-impl/
//          src/main/java/org/apache/geaflow/state/client/SharedShardClient.java

public class SharedShardClient<K, VV, EV> implements StaticGraphTrait<K, VV, EV> {
    
    private final String graphName;
    private final ShardRegistryService registry;
    private final IKeyGroupAssigner keyGroupAssigner;
    private final Map<ShardLocation, RpcClient> clientPool;
    private final Configuration config;
    
    // Local cache (optional)
    private final Cache<CacheKey, Object> localCache;
    
    /**
     * Query vertex (auto-routing)
     */
    @Override
    public IVertex<K, VV> getVertex(K vertexId) {
        int shardId = keyGroupAssigner.assign(vertexId);
        ShardLocation location = registry.getShardLocation(graphName, shardId);
        
        // Check local cache
        CacheKey cacheKey = new CacheKey(shardId, vertexId);
        IVertex<K, VV> cached = localCache.getIfPresent(cacheKey);
        if (cached != null) {
            return cached;
        }
        
        // Remote query
        RpcClient client = getOrCreateClient(location);
        IVertex<K, VV> vertex = client.queryVertex(graphName, shardId, vertexId);
        
        localCache.put(cacheKey, vertex);
        return vertex;
    }
    
    /**
     * Batch query one-degree graphs (grouped by shard for optimization)
     */
    public Map<K, OneDegreeGraph<K, VV, EV>> getOneDegreeGraphs(List<K> vertexIds) {
        // Group by shard
        Map<Integer, List<K>> shardToVertices = vertexIds.stream()
            .collect(Collectors.groupingBy(keyGroupAssigner::assign));
        
        // Parallel query for each shard
        Map<K, OneDegreeGraph<K, VV, EV>> result = new ConcurrentHashMap<>();
        shardToVertices.entrySet().parallelStream().forEach(entry -> {
            int shardId = entry.getKey();
            List<K> ids = entry.getValue();
            ShardLocation location = registry.getShardLocation(graphName, shardId);
            RpcClient client = getOrCreateClient(location);
            
            Map<K, OneDegreeGraph<K, VV, EV>> partial = 
                client.queryOneDegreeGraphBatch(graphName, shardId, ids);
            result.putAll(partial);
        });
        
        return result;
    }
    
    /**
     * Full graph scan (parallel across shards)
     */
    public CloseableIterator<IVertex<K, VV>> scanAllVertices(IStatePushDown pushDown) {
        Map<Integer, ShardLocation> allLocations = registry.getAllShardLocations(graphName);
        
        // Create parallel iterators
        List<CloseableIterator<IVertex<K, VV>>> iterators = allLocations.entrySet()
            .stream()
            .map(e -> createRemoteIterator(e.getKey(), e.getValue(), pushDown))
            .collect(Collectors.toList());
        
        return new MergedIterator<>(iterators);
    }
}

2.2.4 SharedGraphState (Facade)

Responsibility: Adapt to existing GraphState interface, transparent to upper layers

// New file: geaflow/geaflow-state/geaflow-state-impl/
//          src/main/java/org/apache/geaflow/state/SharedGraphStateImpl.java

public class SharedGraphStateImpl<K, VV, EV> implements GraphState<K, VV, EV> {
    
    private final SharedShardClient<K, VV, EV> client;
    private final ManageableGraphState manageableState;
    
    public SharedGraphStateImpl(SharedShardClient<K, VV, EV> client) {
        this.client = client;
        this.manageableState = new SharedManageableGraphState(client);
    }
    
    @Override
    public StaticGraphState<K, VV, EV> staticGraph() {
        return new SharedStaticGraphState<>(client);
    }
    
    @Override
    public DynamicGraphState<K, VV, EV> dynamicGraph() {
        return new SharedDynamicGraphState<>(client);
    }
    
    @Override
    public ManageableGraphState manage() {
        return manageableState;
    }
}

// Shared static graph state implementation
class SharedStaticGraphState<K, VV, EV> implements StaticGraphState<K, VV, EV> {
    
    private final SharedShardClient<K, VV, EV> client;
    
    @Override
    public VertexQuery<K, VV> V() {
        return new SharedVertexQuery<>(client);
    }
    
    @Override
    public EdgeQuery<K, EV> E() {
        return new SharedEdgeQuery<>(client);
    }
}

2.3 Scheduling Layer Modifications

2.3.1 TaskAssigner Modifications

// Modified file: geaflow/geaflow-core/geaflow-runtime/geaflow-runtime-core/
//          src/main/java/org/apache/geaflow/runtime/core/scheduler/resource/TaskAssigner.java

public class TaskAssigner implements Serializable {
    
    // New: shared mode flag
    private boolean sharedShardMode;
    
    /**
     * Task assignment in shared mode
     * - No longer considers data locality
     * - Pure load balancing assignment
     */
    public Map<Integer, WorkerInfo> assignTasksInSharedMode(
            List<Integer> tasks, List<WorkerInfo> workers) {
        
        Map<Integer, WorkerInfo> matches = new HashMap<>();
        int workerCount = workers.size();
        
        // Round-robin assignment, pure load balancing
        for (int i = 0; i < tasks.size(); i++) {
            matches.put(tasks.get(i), workers.get(i % workerCount));
        }
        
        return matches;
    }
    
    /**
     * Check if worker is available (no longer checks shard location)
     */
    public boolean isWorkerAvailable(WorkerInfo worker) {
        if (sharedShardMode) {
            return worker.getStatus() == WorkerStatus.READY;
        }
        // Original logic...
    }
}

2.3.2 New ShardHostManager

// New file: geaflow/geaflow-core/geaflow-engine/geaflow-cluster/
//          src/main/java/org/apache/geaflow/cluster/shard/ShardHostManager.java

public class ShardHostManager {
    
    private final ShardRegistryService registry;
    private final Map<String, ShardServer> shardServers;
    
    /**
     * Initialize ShardHost
     * - Load all shards assigned to this host
     * - Register to Registry
     */
    public void initialize(String graphName, List<Integer> assignedShards, long version) {
        ShardServer server = new ShardServer(config);
        
        for (int shardId : assignedShards) {
            server.loadShard(graphName, shardId, version);
        }
        
        shardServers.put(graphName, server);
    }
    
    /**
     * Shard reassignment (for scaling)
     */
    public void reassignShards(String graphName, 
            List<Integer> toLoad, List<Integer> toUnload, long version) {
        
        ShardServer server = shardServers.get(graphName);
        
        // Unload shards no longer needed
        for (int shardId : toUnload) {
            server.unloadShard(shardId);
        }
        
        // Load newly assigned shards
        for (int shardId : toLoad) {
            server.loadShard(graphName, shardId, version);
        }
    }
}

2.4 Configuration Parameters

// Modified file: geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/config/keys/FrameworkConfigKeys.java

public class FrameworkConfigKeys {
    
    // Existing configuration
    public static final ConfigKey SERVICE_SHARE_ENABLE = ConfigKeys
        .key("geaflow.analytics.service.share.enable")
        .defaultValue(false)
        .description("Enable container-level state sharing");
    
    // ========== New Configurations ==========
    
    /**
     * Enable cross-process shared shard mode
     */
    public static final ConfigKey SHARED_SHARD_ENABLE = ConfigKeys
        .key("geaflow.state.shared.shard.enable")
        .defaultValue(false)
        .description("Enable cross-process shared shard mode");
    
    /**
     * ShardServer RPC port
     */
    public static final ConfigKey SHARD_SERVER_PORT = ConfigKeys
        .key("geaflow.state.shard.server.port")
        .defaultValue(9527)
        .description("ShardServer RPC port");
    
    /**
     * Shard registry service type
     */
    public static final ConfigKey SHARD_REGISTRY_TYPE = ConfigKeys
        .key("geaflow.state.shard.registry.type")
        .defaultValue("redis")
        .description("Shard registry service type: redis/zookeeper/haservice");
    
    /**
     * Client local cache size
     */
    public static final ConfigKey SHARD_CLIENT_CACHE_SIZE = ConfigKeys
        .key("geaflow.state.shard.client.cache.size")
        .defaultValue(10000)
        .description("SharedShardClient local cache size");
    
    /**
     * Client cache expiration time (seconds)
     */
    public static final ConfigKey SHARD_CLIENT_CACHE_EXPIRE_SECONDS = ConfigKeys
        .key("geaflow.state.shard.client.cache.expire.seconds")
        .defaultValue(60)
        .description("SharedShardClient local cache expire time in seconds");
    
    /**
     * RPC call timeout (milliseconds)
     */
    public static final ConfigKey SHARD_RPC_TIMEOUT_MS = ConfigKeys
        .key("geaflow.state.shard.rpc.timeout.ms")
        .defaultValue(30000)
        .description("ShardServer RPC call timeout in milliseconds");
    
    /**
     * Maximum vertices per batch query
     */
    public static final ConfigKey SHARD_BATCH_QUERY_SIZE = ConfigKeys
        .key("geaflow.state.shard.batch.query.size")
        .defaultValue(1000)
        .description("Maximum vertices per batch query");
}

3. Data Flow and Interaction Sequences

3.1 Startup Flow

sequenceDiagram
    participant Master
    participant Registry as ShardRegistryService
    participant SH1 as ShardHost-1
    participant SH2 as ShardHost-2
    participant W1 as Worker-1
    participant W2 as Worker-2

    Master->>Master: Calculate shard assignment strategy
    Master->>SH1: AssignShards([0,1], version=5)
    Master->>SH2: AssignShards([2,3], version=5)
    
    par ShardHost Initialization
        SH1->>SH1: loadShard(0, version=5)
        SH1->>SH1: loadShard(1, version=5)
        SH1->>Registry: registerShard(graph, 0, location1)
        SH1->>Registry: registerShard(graph, 1, location1)
    and
        SH2->>SH2: loadShard(2, version=5)
        SH2->>SH2: loadShard(3, version=5)
        SH2->>Registry: registerShard(graph, 2, location2)
        SH2->>Registry: registerShard(graph, 3, location2)
    end
    
    Master->>W1: CreateTask(taskId=A)
    Master->>W2: CreateTask(taskId=B)
    
    W1->>Registry: subscribe(graph)
    W2->>Registry: subscribe(graph)
    
    Registry-->>W1: ShardLocations([0→SH1, 1→SH1, 2→SH2, 3→SH2])
    Registry-->>W2: ShardLocations([0→SH1, 1→SH1, 2→SH2, 3→SH2])
Loading

3.2 Query Flow

sequenceDiagram
    participant Task as Task-A
    participant Client as SharedShardClient
    participant Registry as ShardRegistryService
    participant SH1 as ShardHost-1
    participant SH2 as ShardHost-2

    Task->>Client: getVertex(vertexId="v1")
    Client->>Client: shardId = keyGroupAssigner.assign("v1") = 2
    Client->>Registry: getShardLocation(graph, shardId=2)
    Registry-->>Client: location = ShardHost-2
    
    Client->>SH2: QueryVertex(graph, shard=2, "v1")
    SH2->>SH2: graphState.staticGraph().V().query("v1")
    SH2-->>Client: Vertex("v1", properties)
    
    Client->>Client: localCache.put(key, vertex)
    Client-->>Task: Vertex("v1", properties)
Loading

3.3 Batch Query Optimization

sequenceDiagram
    participant Task as Task-A
    participant Client as SharedShardClient
    participant SH1 as ShardHost-1
    participant SH2 as ShardHost-2

    Task->>Client: getOneDegreeGraphs([v1, v2, v3, v4, v5])
    
    Client->>Client: Group by shard
    Note over Client: shard-1: [v1, v3]<br/>shard-2: [v2, v4, v5]
    
    par Parallel Query
        Client->>SH1: QueryOneDegreeGraphBatch(shard=1, [v1, v3])
        SH1-->>Client: {v1: ODG1, v3: ODG3}
    and
        Client->>SH2: QueryOneDegreeGraphBatch(shard=2, [v2, v4, v5])
        SH2-->>Client: {v2: ODG2, v4: ODG4, v5: ODG5}
    end
    
    Client->>Client: merge results
    Client-->>Task: {v1: ODG1, v2: ODG2, v3: ODG3, v4: ODG4, v5: ODG5}
Loading

4. Key Technical Challenges and Solutions

4.1 Data Consistency

Challenge: Consistent reading of multi-version data

Solution: Versioned reading + ViewMetaBookKeeper

public class VersionedShardAccess {
    
    /**
     * Ensure reading data of specified version
     */
    public IVertex<K, VV> readWithVersion(K vertexId, long expectedVersion) {
        int shardId = keyGroupAssigner.assign(vertexId);
        ShardLocation location = registry.getShardLocation(graphName, shardId);
        
        // Check version
        if (location.getVersion() < expectedVersion) {
            // Wait for shard to update to specified version
            waitForVersion(shardId, expectedVersion, timeoutMs);
        }
        
        return client.queryVertex(graphName, shardId, vertexId);
    }
    
    /**
     * Snapshot isolation read
     */
    public SnapshotReader<K, VV, EV> createSnapshotReader(long snapshotVersion) {
        // Lock current version to ensure consistent reads
        return new SnapshotReader<>(this, snapshotVersion);
    }
}

4.2 Network Overhead Optimization

Challenge: Network latency for remote access

Solution: Multi-level caching + Batch queries + Prefetching

public class OptimizedShardClient<K, VV, EV> extends SharedShardClient<K, VV, EV> {
    
    // L1: Hot data cache
    private final Cache<CacheKey, Object> hotCache;
    
    // L2: Prefetch cache
    private final Map<Integer, PreFetchBuffer> preFetchBuffers;
    
    /**
     * Query with prefetching
     */
    public IVertex<K, VV> getVertexWithPrefetch(K vertexId, List<K> neighborHints) {
        IVertex<K, VV> vertex = getVertex(vertexId);
        
        // Async prefetch potential neighbor vertices
        if (neighborHints != null && !neighborHints.isEmpty()) {
            CompletableFuture.runAsync(() -> prefetchVertices(neighborHints));
        }
        
        return vertex;
    }
    
    /**
     * Batch request merging
     */
    private final BatchRequestMerger batchMerger = new BatchRequestMerger(
        maxBatchSize,
        maxWaitMs,
        this::executeBatchQuery
    );
    
    public CompletableFuture<IVertex<K, VV>> getVertexAsync(K vertexId) {
        return batchMerger.submit(vertexId);
    }
}

4.3 Failure Recovery

Challenge: Shard migration when ShardHost fails

Solution: Fast failure detection + Shard reassignment

public class ShardFailoverHandler {
    
    private final ShardRegistryService registry;
    private final List<ShardHostInfo> availableHosts;
    
    /**
     * Handle ShardHost failure
     */
    public void handleShardHostFailure(String failedHostId) {
        // 1. Get all shards on failed host
        List<Integer> affectedShards = registry.getShardsOnHost(failedHostId);
        
        // 2. Select new host
        Map<Integer, String> reassignment = computeReassignment(affectedShards);
        
        // 3. Load shards on new host
        for (Map.Entry<Integer, String> entry : reassignment.entrySet()) {
            int shardId = entry.getKey();
            String newHostId = entry.getValue();
            
            ShardHost newHost = getHost(newHostId);
            long latestVersion = registry.getLatestVersion(graphName);
            newHost.loadShard(graphName, shardId, latestVersion);
        }
        
        // 4. Update Registry
        for (Map.Entry<Integer, String> entry : reassignment.entrySet()) {
            registry.updateShardLocation(graphName, entry.getKey(), 
                getLocation(entry.getValue()));
        }
        
        // 5. Notify all clients to refresh cache
        registry.broadcastLocationChange(graphName);
    }
}

4.4 Load Balancing

Challenge: Shard access hotspots

Solution: Dynamic replicas + Request routing

public class LoadBalancedShardRouter {
    
    private final Map<Integer, List<ShardLocation>> shardReplicas;
    private final LoadMetricsCollector metricsCollector;
    
    /**
     * Select best shard replica
     */
    public ShardLocation selectBestReplica(int shardId) {
        List<ShardLocation> replicas = shardReplicas.get(shardId);
        
        if (replicas.size() == 1) {
            return replicas.get(0);
        }
        
        // Select based on load
        return replicas.stream()
            .min(Comparator.comparingDouble(loc -> 
                metricsCollector.getLoad(loc.getHostId())))
            .orElse(replicas.get(0));
    }
    
    /**
     * Detect hot shards and create replicas
     */
    public void handleHotShard(int shardId) {
        double qps = metricsCollector.getShardQPS(shardId);
        
        if (qps > hotShardThreshold) {
            // Select least loaded host to create replica
            String targetHost = selectLeastLoadedHost();
            createShardReplica(shardId, targetHost);
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions