diff --git a/internal/translator/gemini/openai/chat-completions/gemini_openai_request.go b/internal/translator/gemini/openai/chat-completions/gemini_openai_request.go index f18f45bec3..c8948ac593 100644 --- a/internal/translator/gemini/openai/chat-completions/gemini_openai_request.go +++ b/internal/translator/gemini/openai/chat-completions/gemini_openai_request.go @@ -147,21 +147,21 @@ func ConvertOpenAIRequestToGemini(modelName string, inputRawJSON []byte, _ bool) content := m.Get("content") if (role == "system" || role == "developer") && len(arr) > 1 { - // system -> system_instruction as a user message style + // system -> systemInstruction as a user message style if content.Type == gjson.String { - out, _ = sjson.SetBytes(out, "system_instruction.role", "user") - out, _ = sjson.SetBytes(out, fmt.Sprintf("system_instruction.parts.%d.text", systemPartIndex), content.String()) + out, _ = sjson.SetBytes(out, "systemInstruction.role", "user") + out, _ = sjson.SetBytes(out, fmt.Sprintf("systemInstruction.parts.%d.text", systemPartIndex), content.String()) systemPartIndex++ } else if content.IsObject() && content.Get("type").String() == "text" { - out, _ = sjson.SetBytes(out, "system_instruction.role", "user") - out, _ = sjson.SetBytes(out, fmt.Sprintf("system_instruction.parts.%d.text", systemPartIndex), content.Get("text").String()) + out, _ = sjson.SetBytes(out, "systemInstruction.role", "user") + out, _ = sjson.SetBytes(out, fmt.Sprintf("systemInstruction.parts.%d.text", systemPartIndex), content.Get("text").String()) systemPartIndex++ } else if content.IsArray() { contents := content.Array() if len(contents) > 0 { - out, _ = sjson.SetBytes(out, "system_instruction.role", "user") + out, _ = sjson.SetBytes(out, "systemInstruction.role", "user") for j := 0; j < len(contents); j++ { - out, _ = sjson.SetBytes(out, fmt.Sprintf("system_instruction.parts.%d.text", systemPartIndex), contents[j].Get("text").String()) + out, _ = sjson.SetBytes(out, fmt.Sprintf("systemInstruction.parts.%d.text", systemPartIndex), contents[j].Get("text").String()) systemPartIndex++ } } diff --git a/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go b/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go index 143359d60e..463203a759 100644 --- a/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go +++ b/internal/translator/gemini/openai/responses/gemini_openai-responses_request.go @@ -26,7 +26,7 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte if instructions := root.Get("instructions"); instructions.Exists() { systemInstr := `{"parts":[{"text":""}]}` systemInstr, _ = sjson.Set(systemInstr, "parts.0.text", instructions.String()) - out, _ = sjson.SetRaw(out, "system_instruction", systemInstr) + out, _ = sjson.SetRaw(out, "systemInstruction", systemInstr) } // Convert input messages to Gemini contents format @@ -119,7 +119,7 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte if strings.EqualFold(itemRole, "system") { if contentArray := item.Get("content"); contentArray.Exists() { systemInstr := "" - if systemInstructionResult := gjson.Get(out, "system_instruction"); systemInstructionResult.Exists() { + if systemInstructionResult := gjson.Get(out, "systemInstruction"); systemInstructionResult.Exists() { systemInstr = systemInstructionResult.Raw } else { systemInstr = `{"parts":[]}` @@ -140,7 +140,7 @@ func ConvertOpenAIResponsesRequestToGemini(modelName string, inputRawJSON []byte } if systemInstr != `{"parts":[]}` { - out, _ = sjson.SetRaw(out, "system_instruction", systemInstr) + out, _ = sjson.SetRaw(out, "systemInstruction", systemInstr) } } continue diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 5e4ac0d6cc..9f41243804 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -213,6 +213,26 @@ func (m *Manager) syncScheduler() { m.syncSchedulerFromSnapshot(m.snapshotAuths()) } +// RefreshSchedulerEntry re-upserts a single auth into the scheduler so that its +// supportedModelSet is rebuilt from the current global model registry state. +// This must be called after models have been registered for a newly added auth, +// because the initial scheduler.upsertAuth during Register/Update runs before +// registerModelsForAuth and therefore snapshots an empty model set. +func (m *Manager) RefreshSchedulerEntry(authID string) { + if m == nil || m.scheduler == nil || authID == "" { + return + } + m.mu.RLock() + auth, ok := m.auths[authID] + if !ok || auth == nil { + m.mu.RUnlock() + return + } + snapshot := auth.Clone() + m.mu.RUnlock() + m.scheduler.upsertAuth(snapshot) +} + func (m *Manager) SetSelector(selector Selector) { if m == nil { return @@ -2038,6 +2058,10 @@ func shouldRetrySchedulerPick(err error) bool { if err == nil { return false } + var cooldownErr *modelCooldownError + if errors.As(err, &cooldownErr) { + return true + } var authErr *Error if !errors.As(err, &authErr) || authErr == nil { return false diff --git a/sdk/cliproxy/auth/conductor_scheduler_refresh_test.go b/sdk/cliproxy/auth/conductor_scheduler_refresh_test.go new file mode 100644 index 0000000000..5c6eff7805 --- /dev/null +++ b/sdk/cliproxy/auth/conductor_scheduler_refresh_test.go @@ -0,0 +1,163 @@ +package auth + +import ( + "context" + "errors" + "net/http" + "testing" + + "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" +) + +type schedulerProviderTestExecutor struct { + provider string +} + +func (e schedulerProviderTestExecutor) Identifier() string { return e.provider } + +func (e schedulerProviderTestExecutor) Execute(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e schedulerProviderTestExecutor) ExecuteStream(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { + return nil, nil +} + +func (e schedulerProviderTestExecutor) Refresh(ctx context.Context, auth *Auth) (*Auth, error) { + return auth, nil +} + +func (e schedulerProviderTestExecutor) CountTokens(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, nil +} + +func (e schedulerProviderTestExecutor) HttpRequest(ctx context.Context, auth *Auth, req *http.Request) (*http.Response, error) { + return nil, nil +} + +func TestManager_RefreshSchedulerEntry_RebuildsSupportedModelSetAfterModelRegistration(t *testing.T) { + ctx := context.Background() + + testCases := []struct { + name string + prime func(*Manager, *Auth) error + }{ + { + name: "register", + prime: func(manager *Manager, auth *Auth) error { + _, errRegister := manager.Register(ctx, auth) + return errRegister + }, + }, + { + name: "update", + prime: func(manager *Manager, auth *Auth) error { + _, errRegister := manager.Register(ctx, auth) + if errRegister != nil { + return errRegister + } + updated := auth.Clone() + updated.Metadata = map[string]any{"updated": true} + _, errUpdate := manager.Update(ctx, updated) + return errUpdate + }, + }, + } + + for _, testCase := range testCases { + testCase := testCase + t.Run(testCase.name, func(t *testing.T) { + manager := NewManager(nil, &RoundRobinSelector{}, nil) + auth := &Auth{ + ID: "refresh-entry-" + testCase.name, + Provider: "gemini", + } + if errPrime := testCase.prime(manager, auth); errPrime != nil { + t.Fatalf("prime auth %s: %v", testCase.name, errPrime) + } + + registerSchedulerModels(t, "gemini", "scheduler-refresh-model", auth.ID) + + got, errPick := manager.scheduler.pickSingle(ctx, "gemini", "scheduler-refresh-model", cliproxyexecutor.Options{}, nil) + var authErr *Error + if !errors.As(errPick, &authErr) || authErr == nil { + t.Fatalf("pickSingle() before refresh error = %v, want auth_not_found", errPick) + } + if authErr.Code != "auth_not_found" { + t.Fatalf("pickSingle() before refresh code = %q, want %q", authErr.Code, "auth_not_found") + } + if got != nil { + t.Fatalf("pickSingle() before refresh auth = %v, want nil", got) + } + + manager.RefreshSchedulerEntry(auth.ID) + + got, errPick = manager.scheduler.pickSingle(ctx, "gemini", "scheduler-refresh-model", cliproxyexecutor.Options{}, nil) + if errPick != nil { + t.Fatalf("pickSingle() after refresh error = %v", errPick) + } + if got == nil || got.ID != auth.ID { + t.Fatalf("pickSingle() after refresh auth = %v, want %q", got, auth.ID) + } + }) + } +} + +func TestManager_PickNext_RebuildsSchedulerAfterModelCooldownError(t *testing.T) { + ctx := context.Background() + manager := NewManager(nil, &RoundRobinSelector{}, nil) + manager.RegisterExecutor(schedulerProviderTestExecutor{provider: "gemini"}) + + registerSchedulerModels(t, "gemini", "scheduler-cooldown-rebuild-model", "cooldown-stale-old") + + oldAuth := &Auth{ + ID: "cooldown-stale-old", + Provider: "gemini", + } + if _, errRegister := manager.Register(ctx, oldAuth); errRegister != nil { + t.Fatalf("register old auth: %v", errRegister) + } + + manager.MarkResult(ctx, Result{ + AuthID: oldAuth.ID, + Provider: "gemini", + Model: "scheduler-cooldown-rebuild-model", + Success: false, + Error: &Error{HTTPStatus: http.StatusTooManyRequests, Message: "quota"}, + }) + + newAuth := &Auth{ + ID: "cooldown-stale-new", + Provider: "gemini", + } + if _, errRegister := manager.Register(ctx, newAuth); errRegister != nil { + t.Fatalf("register new auth: %v", errRegister) + } + + reg := registry.GetGlobalRegistry() + reg.RegisterClient(newAuth.ID, "gemini", []*registry.ModelInfo{{ID: "scheduler-cooldown-rebuild-model"}}) + t.Cleanup(func() { + reg.UnregisterClient(newAuth.ID) + }) + + got, errPick := manager.scheduler.pickSingle(ctx, "gemini", "scheduler-cooldown-rebuild-model", cliproxyexecutor.Options{}, nil) + var cooldownErr *modelCooldownError + if !errors.As(errPick, &cooldownErr) { + t.Fatalf("pickSingle() before sync error = %v, want modelCooldownError", errPick) + } + if got != nil { + t.Fatalf("pickSingle() before sync auth = %v, want nil", got) + } + + got, executor, errPick := manager.pickNext(ctx, "gemini", "scheduler-cooldown-rebuild-model", cliproxyexecutor.Options{}, nil) + if errPick != nil { + t.Fatalf("pickNext() error = %v", errPick) + } + if executor == nil { + t.Fatal("pickNext() executor = nil") + } + if got == nil || got.ID != newAuth.ID { + t.Fatalf("pickNext() auth = %v, want %q", got, newAuth.ID) + } +} diff --git a/sdk/cliproxy/service.go b/sdk/cliproxy/service.go index 0867a4fea8..4dbf44f3e7 100644 --- a/sdk/cliproxy/service.go +++ b/sdk/cliproxy/service.go @@ -323,6 +323,12 @@ func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.A // This operation may block on network calls, but the auth configuration // is already effective at this point. s.registerModelsForAuth(auth) + + // Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt + // from the now-populated global model registry. Without this, newly added auths + // have an empty supportedModelSet (because Register/Update upserts into the + // scheduler before registerModelsForAuth runs) and are invisible to the scheduler. + s.coreManager.RefreshSchedulerEntry(auth.ID) } func (s *Service) applyCoreAuthRemoval(ctx context.Context, id string) {