-
Notifications
You must be signed in to change notification settings - Fork 16
Open
Labels
enhancementNew feature or requestNew feature or requesthacktoberfesthelp wantedExtra attention is neededExtra attention is needed
Description
Feature Description
Implement Operational Transformation (OT) or Conflict-free Replicated Data Types (CRDTs) to handle concurrent editing conflicts gracefully and ensure eventual consistency across all connected clients.
Problem Statement
Current concurrency issues:
- Race conditions when multiple users edit simultaneously
- Last-write-wins approach can lose data
- No conflict detection or resolution
- Undo/redo breaks when others are editing
- Strokes can appear in wrong order
- No guarantee of consistency across clients
Operational Transformation Basics
OT ensures that concurrent operations converge to the same state by transforming operations against each other.
Example Conflict:
- User A: Adds stroke at position 5
- User B: Deletes stroke at position 3 (simultaneously)
Without OT, User A's stroke might reference a position that no longer exists after User B's deletion.
1. OT Core Implementation
// frontend/src/services/OperationalTransformation.js
export class OperationTransformer {
/**
* Transform operation A against operation B
* Returns transformed version of A that accounts for B's changes
*/
transform(opA, opB) {
const type = `${opA.type}_${opB.type}`;
switch (type) {
case 'insert_insert':
return this.transformInsertInsert(opA, opB);
case 'insert_delete':
return this.transformInsertDelete(opA, opB);
case 'delete_insert':
return this.transformDeleteInsert(opA, opB);
case 'delete_delete':
return this.transformDeleteDelete(opA, opB);
default:
return opA;
}
}
transformInsertInsert(opA, opB) {
// If B inserts before A's position, shift A's position
if (opB.position <= opA.position) {
return { ...opA, position: opA.position + 1 };
}
return opA;
}
transformInsertDelete(opA, opB) {
// If B deletes before A's position, shift A's position down
if (opB.position < opA.position) {
return { ...opA, position: opA.position - 1 };
}
return opA;
}
transformDeleteInsert(opA, opB) {
// If B inserts before A's delete position, shift A up
if (opB.position <= opA.position) {
return { ...opA, position: opA.position + 1 };
}
return opA;
}
transformDeleteDelete(opA, opB) {
// Both trying to delete same position - B wins
if (opA.position === opB.position) {
return null; // Cancel A's operation
}
// B deleted before A's position
if (opB.position < opA.position) {
return { ...opA, position: opA.position - 1 };
}
return opA;
}
}2. Vector Clock for Causality
// frontend/src/services/VectorClock.js
export class VectorClock {
constructor(clientId) {
this.clientId = clientId;
this.clock = { [clientId]: 0 };
}
increment() {
this.clock[this.clientId]++;
return this.getCopy();
}
update(otherClock) {
for (const [clientId, timestamp] of Object.entries(otherClock)) {
this.clock[clientId] = Math.max(
this.clock[clientId] || 0,
timestamp
);
}
}
getCopy() {
return { ...this.clock };
}
happensBefore(otherClock) {
let strictlyLess = false;
for (const clientId in this.clock) {
if (this.clock[clientId] > (otherClock[clientId] || 0)) {
return false;
}
if (this.clock[clientId] < (otherClock[clientId] || 0)) {
strictlyLess = true;
}
}
return strictlyLess;
}
concurrent(otherClock) {
}
happensAfter(otherClock) {
return new VectorClock(this.clientId).setClock(otherClock).happensBefore(this.clock);
}
setClock(clock) {
this.clock = { ...clock };
return this;
}
}3. Operation History Buffer
// frontend/src/services/OperationHistory.js
export class OperationHistory {
constructor(clientId) {
this.clientId = clientId;
this.history = [];
this.vectorClock = new VectorClock(clientId);
this.transformer = new OperationTransformer();
}
addLocalOperation(operation) {
const op = {
...operation,
clientId: this.clientId,
vectorClock: this.vectorClock.increment(),
id: generateOperationId()
};
this.history.push(op);
return op;
}
addRemoteOperation(operation) {
// Update our vector clock
this.vectorClock.update(operation.vectorClock);
// Transform against concurrent local operations
let transformedOp = operation;
for (const localOp of this.getLocalOperationsSince(operation.vectorClock)) {
transformedOp = this.transformer.transform(transformedOp, localOp);
}
this.history.push(transformedOp);
return transformedOp;
}
getLocalOperationsSince(vectorClock) {
return this.history.filter(op =>
op.clientId === this.clientId &&
);
}
pruneHistory(beforeTimestamp) {
this.history = this.history.filter(op =>
op.timestamp > beforeTimestamp
);
}
}4. Canvas Operation Types
// frontend/src/models/operations.js
export const OperationType = {
INSERT_STROKE: 'insert_stroke',
DELETE_STROKE: 'delete_stroke',
MODIFY_STROKE: 'modify_stroke',
MOVE_STROKE: 'move_stroke',
CLEAR_CANVAS: 'clear_canvas'
};
export class Operation {
static insertStroke(stroke, position) {
return {
type: OperationType.INSERT_STROKE,
stroke: stroke,
position: position,
timestamp: Date.now()
};
}
static deleteStroke(strokeId, position) {
return {
type: OperationType.DELETE_STROKE,
strokeId: strokeId,
position: position,
timestamp: Date.now()
};
}
static modifyStroke(strokeId, changes) {
return {
type: OperationType.MODIFY_STROKE,
strokeId: strokeId,
changes: changes,
timestamp: Date.now()
};
}
}5. Collaborative Canvas Manager
// frontend/src/services/CollaborativeCanvas.js
export class CollaborativeCanvas {
constructor(roomId, userId, socket) {
this.roomId = roomId;
this.userId = userId;
this.socket = socket;
this.operationHistory = new OperationHistory(userId);
this.pendingOperations = [];
}
localStrokeAdded(stroke) {
const operation = Operation.insertStroke(stroke, this.getStrokeCount());
const op = this.operationHistory.addLocalOperation(operation);
// Send to server
this.socket.emit('operation', {
roomId: this.roomId,
operation: op
});
// Apply locally
this.applyOperation(op);
}
remoteOperationReceived(operation) {
const transformedOp = this.operationHistory.addRemoteOperation(operation);
if (transformedOp) {
this.applyOperation(transformedOp);
}
}
applyOperation(operation) {
switch (operation.type) {
case OperationType.INSERT_STROKE:
this.canvas.insertStrokeAt(operation.stroke, operation.position);
break;
case OperationType.DELETE_STROKE:
this.canvas.deleteStrokeAt(operation.position);
break;
case OperationType.MODIFY_STROKE:
this.canvas.modifyStroke(operation.strokeId, operation.changes);
break;
}
}
undo() {
// Get last local operation
const lastOp = this.operationHistory.getLastLocalOperation();
// Create inverse operation
const inverseOp = this.createInverseOperation(lastOp);
const op = this.operationHistory.addLocalOperation(inverseOp);
// Broadcast
this.socket.emit('operation', {
roomId: this.roomId,
operation: op
});
this.applyOperation(op);
}
createInverseOperation(operation) {
switch (operation.type) {
case OperationType.INSERT_STROKE:
return Operation.deleteStroke(operation.stroke.id, operation.position);
case OperationType.DELETE_STROKE:
return Operation.insertStroke(operation.stroke, operation.position);
default:
return null;
}
}
}6. Backend Operation Server
# backend/services/operation_server.py
class OperationServer:
def __init__(self):
self.room_operations = {} # room_id -> list of operations
def receive_operation(self, room_id, operation):
if room_id not in self.room_operations:
self.room_operations[room_id] = []
# Validate operation
if not self.validate_operation(operation):
return {'error': 'Invalid operation'}
# Store operation
self.room_operations[room_id].append(operation)
# Broadcast to other clients
return {'success': True, 'operation': operation}
def validate_operation(self, operation):
# Check required fields
required = ['type', 'clientId', 'vectorClock', 'timestamp']
return all(field in operation for field in required)
def get_operations_since(self, room_id, vector_clock):
if room_id not in self.room_operations:
return []
# Return operations that happened after the given vector clock
return [
op for op in self.room_operations[room_id]
if not self.happened_before(op['vectorClock'], vector_clock)
]
def happened_before(self, clock_a, clock_b):
# Vector clock comparison
for client_id in clock_a:
if clock_a[client_id] > clock_b.get(client_id, 0):
return False
return True7. Socket.IO Integration
# backend/routes/socketio_handlers.py
from services.operation_server import OperationServer
operation_server = OperationServer()
@socketio.on('operation')
@require_auth_socketio
def handle_operation(data):
user = get_current_user()
room_id = data.get('roomId')
operation = data.get('operation')
# Process operation
result = operation_server.receive_operation(room_id, operation)
if 'error' in result:
emit('operation_error', result)
return
# Broadcast to room (except sender)
emit('operation', {
'operation': operation
}, room=room_id, skip_sid=request.sid)Files to Create/Modify
Frontend:
frontend/src/services/OperationalTransformation.js⭐ (NEW)frontend/src/services/VectorClock.js⭐ (NEW)frontend/src/services/OperationHistory.js⭐ (NEW)frontend/src/services/CollaborativeCanvas.js⭐ (NEW)frontend/src/models/operations.js⭐ (NEW)frontend/src/components/Canvas.js(MODIFY)
Backend:
backend/services/operation_server.py⭐ (NEW)backend/routes/socketio_handlers.py(MODIFY)
Benefits
- Guaranteed eventual consistency
- No lost edits due to conflicts
- Proper concurrent undo/redo
- Resilient to network delays
- Scales to many simultaneous users
- Professional collaborative editing like Google Docs
Testing Requirements
- Simulate concurrent operations
- Test network partition scenarios
- Verify convergence with 3+ clients
- Stress test with rapid operations
- Test undo/redo during conflicts
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or requesthacktoberfesthelp wantedExtra attention is neededExtra attention is needed