diff --git a/kubernetes/internal/controller/batchsandbox_controller.go b/kubernetes/internal/controller/batchsandbox_controller.go index 6008d3cd..d8d22c2f 100644 --- a/kubernetes/internal/controller/batchsandbox_controller.go +++ b/kubernetes/internal/controller/batchsandbox_controller.go @@ -347,6 +347,16 @@ func (r *BatchSandboxReconciler) getTaskScheduler(ctx context.Context, batchSbx } // Update the pods list for this scheduler tSch.UpdatePods(pods) + // Handle scale-out: register task specs for any replicas added since the + // scheduler was first created. Already-tracked task names are skipped. + taskStrategy := strategy.NewTaskSchedulingStrategy(batchSbx) + taskSpecs, err := taskStrategy.GenerateTaskSpecs() + if err != nil { + return nil, fmt.Errorf("failed to generate task specs for scale-out: %w", err) + } + if err := tSch.AddTasks(taskSpecs); err != nil { + return nil, fmt.Errorf("failed to add tasks on scale-out: %w", err) + } } return tSch, nil } @@ -464,7 +474,6 @@ func (r *BatchSandboxReconciler) scaleBatchSandbox(ctx context.Context, batchSan for i := range pods { pod := pods[i] BatchSandboxScaleExpectations.ObserveScale(controllerutils.GetControllerKey(batchSandbox), expectations.Create, pod.Name) - pods = append(pods, pod) idx, err := parseIndex(pod) if err != nil { return fmt.Errorf("failed to parse idx Pod %s, err %w", pod.Name, err) diff --git a/kubernetes/internal/scheduler/default_scheduler.go b/kubernetes/internal/scheduler/default_scheduler.go index a4bf818f..4aa0b554 100644 --- a/kubernetes/internal/scheduler/default_scheduler.go +++ b/kubernetes/internal/scheduler/default_scheduler.go @@ -215,6 +215,23 @@ func (sch *defaultTaskScheduler) UpdatePods(pods []*corev1.Pod) { sch.allPods = pods } +// AddTasks registers task specs that are not yet tracked by the scheduler. +// Tasks whose names are already tracked are silently skipped, making this +// safe to call with the full task list during a scale-out reconciliation. +func (sch *defaultTaskScheduler) AddTasks(tasks []*api.Task) error { + newNodes, err := initTaskNodes(tasks) + if err != nil { + return err + } + for _, node := range newNodes { + if _, exists := sch.taskNodeByNameIndex[node.Name]; !exists { + sch.taskNodes = append(sch.taskNodes, node) + sch.taskNodeByNameIndex[node.Name] = node + } + } + return nil +} + func (sch *defaultTaskScheduler) ListTask() []Task { ret := make([]Task, len(sch.taskNodes), len(sch.taskNodes)) for i := range sch.taskNodes { diff --git a/kubernetes/internal/scheduler/default_scheduler_test.go b/kubernetes/internal/scheduler/default_scheduler_test.go index fe5673eb..7fec1454 100644 --- a/kubernetes/internal/scheduler/default_scheduler_test.go +++ b/kubernetes/internal/scheduler/default_scheduler_test.go @@ -1290,3 +1290,81 @@ func Test_initTaskNodes(t *testing.T) { }) } } + +func Test_addTasks(t *testing.T) { + tests := []struct { + name string + initial []*api.Task + addTasks []*api.Task + wantNodeNames []string + wantNodeCount int + }{ + { + name: "scale-out: new tasks are appended, existing tasks are skipped", + initial: []*api.Task{ + {Name: "sandbox-0", Process: &api.Process{Command: []string{"echo", "0"}}}, + }, + addTasks: []*api.Task{ + {Name: "sandbox-0", Process: &api.Process{Command: []string{"echo", "0"}}}, + {Name: "sandbox-1", Process: &api.Process{Command: []string{"echo", "1"}}}, + }, + wantNodeNames: []string{"sandbox-0", "sandbox-1"}, + wantNodeCount: 2, + }, + { + name: "no-op: add same tasks as already tracked", + initial: []*api.Task{ + {Name: "sandbox-0"}, + {Name: "sandbox-1"}, + }, + addTasks: []*api.Task{ + {Name: "sandbox-0"}, + {Name: "sandbox-1"}, + }, + wantNodeNames: []string{"sandbox-0", "sandbox-1"}, + wantNodeCount: 2, + }, + { + name: "empty scheduler: add initial tasks", + initial: []*api.Task{}, + addTasks: []*api.Task{ + {Name: "sandbox-0"}, + }, + wantNodeNames: []string{"sandbox-0"}, + wantNodeCount: 1, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + initialNodes, err := initTaskNodes(tt.initial) + if err != nil { + t.Fatalf("initTaskNodes() error = %v", err) + } + sch := &defaultTaskScheduler{ + taskNodes: initialNodes, + taskNodeByNameIndex: indexByName(initialNodes), + logger: testLogger, + } + + if err := sch.AddTasks(tt.addTasks); err != nil { + t.Fatalf("AddTasks() unexpected error = %v", err) + } + + if len(sch.taskNodes) != tt.wantNodeCount { + t.Errorf("AddTasks() taskNodes count = %d, want %d", len(sch.taskNodes), tt.wantNodeCount) + } + + nodeNames := make([]string, len(sch.taskNodes)) + for i, n := range sch.taskNodes { + nodeNames[i] = n.Name + } + if !reflect.DeepEqual(nodeNames, tt.wantNodeNames) { + t.Errorf("AddTasks() taskNode names = %v, want %v", nodeNames, tt.wantNodeNames) + } + + if len(sch.taskNodeByNameIndex) != tt.wantNodeCount { + t.Errorf("AddTasks() taskNodeByNameIndex size = %d, want %d", len(sch.taskNodeByNameIndex), tt.wantNodeCount) + } + }) + } +} diff --git a/kubernetes/internal/scheduler/interface.go b/kubernetes/internal/scheduler/interface.go index 2de1476a..02e40994 100644 --- a/kubernetes/internal/scheduler/interface.go +++ b/kubernetes/internal/scheduler/interface.go @@ -27,6 +27,10 @@ type TaskScheduler interface { UpdatePods(pod []*corev1.Pod) ListTask() []Task StopTask() []Task + // AddTasks registers task specs that are not yet tracked by the scheduler. + // Tasks whose names are already tracked are silently skipped, making this + // safe to call with the full task list during a scale-out reconciliation. + AddTasks(tasks []*apis.Task) error } func NewTaskScheduler(name string, tasks []*apis.Task, pods []*corev1.Pod, resPolicyWhenTaskCompleted sandboxv1alpha1.TaskResourcePolicy, logger logr.Logger) (TaskScheduler, error) { diff --git a/kubernetes/internal/scheduler/mock/interface.go b/kubernetes/internal/scheduler/mock/interface.go index 7d421734..96f363e7 100644 --- a/kubernetes/internal/scheduler/mock/interface.go +++ b/kubernetes/internal/scheduler/mock/interface.go @@ -11,6 +11,7 @@ import ( v1 "k8s.io/api/core/v1" scheduler "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/scheduler" + api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) // MockTaskScheduler is a mock of TaskScheduler interface. @@ -36,6 +37,20 @@ func (m *MockTaskScheduler) EXPECT() *MockTaskSchedulerMockRecorder { return m.recorder } +// AddTasks mocks base method. +func (m *MockTaskScheduler) AddTasks(tasks []*api.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddTasks", tasks) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddTasks indicates an expected call of AddTasks. +func (mr *MockTaskSchedulerMockRecorder) AddTasks(tasks interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTasks", reflect.TypeOf((*MockTaskScheduler)(nil).AddTasks), tasks) +} + // ListTask mocks base method. func (m *MockTaskScheduler) ListTask() []scheduler.Task { m.ctrl.T.Helper()