forked from stoktimeskipkagayomo/Luma
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_server.py
More file actions
3687 lines (3136 loc) · 178 KB
/
api_server.py
File metadata and controls
3687 lines (3136 loc) · 178 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# api_server.py
# Luma API Backend Service
import asyncio
import json
import logging
import os
import sys
import subprocess
import time
import uuid
import re
import threading
import random
import mimetypes
from datetime import datetime
from contextlib import asynccontextmanager
from collections import deque
from threading import Lock
from pathlib import Path
import uvicorn
import requests
import aiohttp # 新增:用于异步HTTP请求
from asyncio import Semaphore
from typing import Optional, Tuple
from packaging.version import parse as parse_version
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, JSONResponse, Response, HTMLResponse
# --- 内部模块导入 ---
from modules.file_uploader import upload_to_file_bed
from modules.monitoring import monitoring_service, MonitorConfig
from modules.token_manager import token_manager
from modules.geo_platform import geo_platform_service
from modules.logging_system import log_system, LogLevel, LogType
# 图像自动增强功能已移除(已剥离为独立项目)
import urllib3
# 全局禁用SSL警告(可选,但推荐)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# --- 基础配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- 日志过滤器 ---
class EndpointFilter(logging.Filter):
"""过滤掉监控相关的API请求日志"""
def filter(self, record: logging.LogRecord) -> bool:
message = record.getMessage()
# 过滤掉所有 /api/monitor/ 和 /monitor 的GET请求
is_monitor_request = "GET /api/monitor/" in message or "GET /monitor " in message
return not is_monitor_request
# 将过滤器添加到uvicorn的访问日志记录器
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())
# --- 全局状态与配置 ---
CONFIG = {} # 存储从 config.jsonc 加载的配置
# browser_ws 用于存储与单个油猴脚本的 WebSocket 连接。
# 注意:此架构假定只有一个浏览器标签页在工作。
# 如果需要支持多个并发标签页,需要将此扩展为字典管理多个连接。
browser_ws: WebSocket | None = None
# response_channels 用于存储每个 API 请求的响应队列。
# 键是 request_id,值是 asyncio.Queue。
response_channels: dict[str, asyncio.Queue] = {}
# 新增:请求元数据存储(用于WebSocket重连后恢复请求)
request_metadata: dict[str, dict] = {}
last_activity_time = None # 记录最后一次活动的时间
idle_monitor_thread = None # 空闲监控线程
main_event_loop = None # 主事件循环
# 新增:用于跟踪是否因人机验证而刷新
IS_REFRESHING_FOR_VERIFICATION = False
# 新增:用于自动重试的请求暂存队列
pending_requests_queue = asyncio.Queue()
# 新增:WebSocket连接锁,保护并发访问
ws_lock = asyncio.Lock()
# 新增:全局aiohttp会话
aiohttp_session = None
# --- 图片自动下载配置 ---
IMAGE_SAVE_DIR = Path("./downloaded_images")
IMAGE_SAVE_DIR.mkdir(exist_ok=True)
# 使用deque限制大小,避免内存泄漏
downloaded_image_urls = deque(maxlen=5000) # 最多记录5000个URL
downloaded_urls_set = set() # 用于快速查重
# 新增:用于在运行时临时禁用失败的图床端点
DISABLED_ENDPOINTS = {} # 改为字典,记录禁用时间
# 新增:用于轮询策略的全局索引
ROUND_ROBIN_INDEX = 0
# 新增:图床恢复时间(秒)
FILEBED_RECOVERY_TIME = 300 # 5分钟后自动恢复
# 新增:用于模型ID映射轮询的索引字典(需要线程安全保护)
MODEL_ROUND_ROBIN_INDEX = {} # {model_name: current_index}
MODEL_ROUND_ROBIN_LOCK = Lock() # 保护轮询索引的线程锁
# 新增:图片Base64缓存(避免重复下载和转换)
IMAGE_BASE64_CACHE = {} # {url: (base64_data, timestamp)}
IMAGE_CACHE_MAX_SIZE = 1000 # 最多缓存100张图片
IMAGE_CACHE_TTL = 3600 # 缓存有效期1小时(秒)
# 新增:图床URL缓存(避免相同图片重复上传)
FILEBED_URL_CACHE = {} # {image_hash: (uploaded_url, timestamp)}
FILEBED_URL_CACHE_TTL = 300 # 图床链接缓存5分钟(秒)
FILEBED_URL_CACHE_MAX_SIZE = 500 # 最多缓存500个图床链接
# 新增:并发下载控制
DOWNLOAD_SEMAPHORE: Optional[Semaphore] = None
MAX_CONCURRENT_DOWNLOADS = 50 # 默认最大并发下载数
# --- 模型映射 ---
# MODEL_NAME_TO_ID_MAP 现在将存储更丰富的对象: { "model_name": {"id": "...", "type": "..."} }
MODEL_NAME_TO_ID_MAP = {}
MODEL_ENDPOINT_MAP = {} # 新增:用于存储模型到 session/message ID 的映射
DEFAULT_MODEL_ID = None # 默认模型id: None
def load_model_endpoint_map():
"""从 model_endpoint_map.json 加载模型到端点的映射。"""
global MODEL_ENDPOINT_MAP
try:
with open('model_endpoint_map.json', 'r', encoding='utf-8') as f:
content = f.read()
# 允许空文件
if not content.strip():
MODEL_ENDPOINT_MAP = {}
else:
MODEL_ENDPOINT_MAP = json.loads(content)
logger.info(f"成功从 'model_endpoint_map.json' 加载了 {len(MODEL_ENDPOINT_MAP)} 个模型端点映射。")
except FileNotFoundError:
logger.warning("'model_endpoint_map.json' 文件未找到。将使用空映射。")
MODEL_ENDPOINT_MAP = {}
except json.JSONDecodeError as e:
logger.error(f"加载或解析 'model_endpoint_map.json' 失败: {e}。将使用空映射。")
MODEL_ENDPOINT_MAP = {}
def _parse_jsonc(jsonc_string: str) -> dict:
"""
稳健地解析 JSONC 字符串,移除注释。
改进版:正确处理字符串内的 // 和 /* */
"""
lines = jsonc_string.splitlines()
no_comments_lines = []
in_block_comment = False
for line in lines:
if in_block_comment:
# 在块注释中,查找结束标记
if '*/' in line:
in_block_comment = False
# 保留块注释结束后的内容
line = line.split('*/', 1)[1]
else:
continue
# 处理可能的块注释开始
if '/*' in line:
# 需要更智能地处理,避免删除字符串中的 /*
before_comment, _, after_comment = line.partition('/*')
if '*/' in after_comment:
# 单行块注释
_, _, after_block = after_comment.partition('*/')
line = before_comment + after_block
else:
# 多行块注释开始
line = before_comment
in_block_comment = True
# 处理单行注释 //,但要避免删除字符串中的 //
# 使用更智能的方法:查找不在引号内的 //
processed_line = ""
in_string = False
escape_next = False
i = 0
while i < len(line):
char = line[i]
if escape_next:
processed_line += char
escape_next = False
i += 1
continue
if char == '\\':
processed_line += char
escape_next = True
i += 1
continue
if char == '"' and not in_string:
in_string = True
processed_line += char
elif char == '"' and in_string:
in_string = False
processed_line += char
elif char == '/' and i + 1 < len(line) and line[i + 1] == '/' and not in_string:
# 找到了真正的注释,停止处理这一行
break
else:
processed_line += char
i += 1
# 只有非空行才添加
if processed_line.strip():
no_comments_lines.append(processed_line)
return json.loads("\n".join(no_comments_lines))
def load_config():
"""从 config.jsonc 加载配置,并处理 JSONC 注释。"""
global CONFIG
try:
with open('config.jsonc', 'r', encoding='utf-8') as f:
content = f.read()
CONFIG = _parse_jsonc(content)
logger.info("成功从 'config.jsonc' 加载配置。")
# 打印关键配置状态
logger.info(f" - 酒馆模式 (Tavern Mode): {'✅ 启用' if CONFIG.get('tavern_mode_enabled') else '❌ 禁用'}")
logger.info(f" - 绕过模式 (Bypass Mode): {'✅ 启用' if CONFIG.get('bypass_enabled') else '❌ 禁用'}")
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.error(f"加载或解析 'config.jsonc' 失败: {e}。将使用默认配置。")
CONFIG = {}
def load_model_map():
"""从 models.json 加载模型映射,支持 'id:type' 格式。"""
global MODEL_NAME_TO_ID_MAP
try:
with open('models.json', 'r', encoding='utf-8') as f:
raw_map = json.load(f)
processed_map = {}
for name, value in raw_map.items():
if isinstance(value, str) and ':' in value:
parts = value.split(':', 1)
model_id = parts[0] if parts[0].lower() != 'null' else None
model_type = parts[1]
processed_map[name] = {"id": model_id, "type": model_type}
else:
# 默认或旧格式处理
processed_map[name] = {"id": value, "type": "text"}
MODEL_NAME_TO_ID_MAP = processed_map
logger.info(f"成功从 'models.json' 加载并解析了 {len(MODEL_NAME_TO_ID_MAP)} 个模型。")
except (FileNotFoundError, json.JSONDecodeError) as e:
logger.error(f"加载 'models.json' 失败: {e}。将使用空模型列表。")
MODEL_NAME_TO_ID_MAP = {}
# --- 公告处理 ---
def check_and_display_announcement():
"""检查并显示一次性公告。"""
announcement_file = "announcement-lmarena.json"
if os.path.exists(announcement_file):
try:
logger.info("="*60)
logger.info("📢 检测到更新公告,内容如下:")
with open(announcement_file, 'r', encoding='utf-8') as f:
announcement = json.load(f)
title = announcement.get("title", "公告")
content = announcement.get("content", [])
logger.info(f" --- {title} ---")
for line in content:
logger.info(f" {line}")
logger.info("="*60)
except json.JSONDecodeError:
logger.error(f"无法解析公告文件 '{announcement_file}'。文件内容可能不是有效的JSON。")
except Exception as e:
logger.error(f"读取公告文件时发生错误: {e}")
finally:
try:
os.remove(announcement_file)
logger.info(f"公告文件 '{announcement_file}' 已被移除。")
except OSError as e:
logger.error(f"删除公告文件 '{announcement_file}' 失败: {e}")
# --- 更新检查 ---
GITHUB_REPO = "zhongruichen/LMArenaBridge-mogai" # Repository name unchanged
def download_and_extract_update(version):
"""下载并解压最新版本到临时文件夹。"""
update_dir = "update_temp"
if not os.path.exists(update_dir):
os.makedirs(update_dir)
try:
zip_url = f"https://github.com/{GITHUB_REPO}/archive/refs/heads/mogai-version.zip"
logger.info(f"正在从 {zip_url} 下载新版本...")
response = requests.get(zip_url, timeout=60)
response.raise_for_status()
# 需要导入 zipfile 和 io
import zipfile
import io
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
z.extractall(update_dir)
logger.info(f"新版本已成功下载并解压到 '{update_dir}' 文件夹。")
return True
except requests.RequestException as e:
logger.error(f"下载更新失败: {e}")
except zipfile.BadZipFile:
logger.error("下载的文件不是一个有效的zip压缩包。")
except Exception as e:
logger.error(f"解压更新时发生未知错误: {e}")
return False
def check_for_updates():
"""从 GitHub 检查新版本。"""
if not CONFIG.get("enable_auto_update", True):
logger.info("自动更新已禁用,跳过检查。")
return
current_version = CONFIG.get("version", "0.0.0")
logger.info(f"当前版本: {current_version}。正在从 GitHub 检查更新...")
try:
config_url = f"https://raw.githubusercontent.com/{GITHUB_REPO}/mogai-version/config.jsonc"
response = requests.get(config_url, timeout=10)
response.raise_for_status()
jsonc_content = response.text
remote_config = _parse_jsonc(jsonc_content)
remote_version_str = remote_config.get("version")
if not remote_version_str:
logger.warning("远程配置文件中未找到版本号,跳过更新检查。")
return
if parse_version(remote_version_str) > parse_version(current_version):
logger.info("="*60)
logger.info(f"🎉 发现新版本! 🎉")
logger.info(f" - 当前版本: {current_version}")
logger.info(f" - 最新版本: {remote_version_str}")
if download_and_extract_update(remote_version_str):
logger.info("准备应用更新。服务器将在5秒后关闭并启动更新脚本。")
time.sleep(5)
update_script_path = os.path.join("modules", "update_script.py")
# 使用 Popen 启动独立进程
subprocess.Popen([sys.executable, update_script_path])
# 优雅地退出当前服务器进程
os._exit(0)
else:
logger.error(f"自动更新失败。请访问 https://github.com/{GITHUB_REPO}/releases/latest 手动下载。")
logger.info("="*60)
else:
logger.info("您的程序已是最新版本。")
except requests.RequestException as e:
logger.error(f"检查更新失败: {e}")
except json.JSONDecodeError:
logger.error("解析远程配置文件失败。")
except Exception as e:
logger.error(f"检查更新时发生未知错误: {e}")
# --- 模型更新 ---
def extract_models_from_html(html_content):
"""
从 HTML 内容中提取完整的模型JSON对象,使用括号匹配确保完整性。
"""
models = []
model_names = set()
# 查找所有可能的模型JSON对象的起始位置
for start_match in re.finditer(r'\{\\"id\\":\\"[a-f0-9-]+\\"', html_content):
start_index = start_match.start()
# 从起始位置开始,进行花括号匹配
open_braces = 0
end_index = -1
# 优化:设置一个合理的搜索上限,避免无限循环
search_limit = start_index + 10000 # 假设一个模型定义不会超过10000个字符
for i in range(start_index, min(len(html_content), search_limit)):
if html_content[i] == '{':
open_braces += 1
elif html_content[i] == '}':
open_braces -= 1
if open_braces == 0:
end_index = i + 1
break
if end_index != -1:
# 提取完整的、转义的JSON字符串
json_string_escaped = html_content[start_index:end_index]
# 反转义
json_string = json_string_escaped.replace('\\"', '"').replace('\\\\', '\\')
try:
model_data = json.loads(json_string)
model_name = model_data.get('publicName')
# 使用publicName去重
if model_name and model_name not in model_names:
models.append(model_data)
model_names.add(model_name)
except json.JSONDecodeError as e:
logger.warning(f"解析提取的JSON对象时出错: {e} - 内容: {json_string[:150]}...")
continue
if models:
logger.info(f"成功提取并解析了 {len(models)} 个独立模型。")
return models
else:
logger.error("错误:在HTML响应中找不到任何匹配的完整模型JSON对象。")
return None
def save_available_models(new_models_list, models_path="available_models.json"):
"""
将提取到的完整模型对象列表保存到指定的JSON文件中。
"""
logger.info(f"检测到 {len(new_models_list)} 个模型,正在更新 '{models_path}'...")
try:
with open(models_path, 'w', encoding='utf-8') as f:
# 直接将完整的模型对象列表写入文件
json.dump(new_models_list, f, indent=4, ensure_ascii=False)
logger.info(f"✅ '{models_path}' 已成功更新,包含 {len(new_models_list)} 个模型。")
except IOError as e:
logger.error(f"❌ 写入 '{models_path}' 文件时出错: {e}")
# --- 自动重启逻辑 ---
def restart_server():
"""优雅地通知客户端刷新,然后重启服务器。"""
logger.warning("="*60)
logger.warning("检测到服务器空闲超时,准备自动重启...")
logger.warning("="*60)
# 1. (异步) 通知浏览器刷新
async def notify_browser_refresh():
if browser_ws:
try:
# 优先发送 'reconnect' 指令,让前端知道这是一个计划内的重启
await browser_ws.send_text(json.dumps({"command": "reconnect"}, ensure_ascii=False))
logger.info("已向浏览器发送 'reconnect' 指令。")
except Exception as e:
logger.error(f"发送 'reconnect' 指令失败: {e}")
# 在主事件循环中运行异步通知函数
# 使用`asyncio.run_coroutine_threadsafe`确保线程安全
if browser_ws and browser_ws.client_state.name == 'CONNECTED' and main_event_loop:
asyncio.run_coroutine_threadsafe(notify_browser_refresh(), main_event_loop)
# 2. 延迟几秒以确保消息发送
time.sleep(3)
# 3. 执行重启
logger.info("正在重启服务器...")
os.execv(sys.executable, ['python'] + sys.argv)
def idle_monitor():
"""在后台线程中运行,监控服务器是否空闲。"""
global last_activity_time
# 等待,直到 last_activity_time 被首次设置
while last_activity_time is None:
time.sleep(1)
logger.info("空闲监控线程已启动。")
while True:
if CONFIG.get("enable_idle_restart", False):
timeout = CONFIG.get("idle_restart_timeout_seconds", 300)
# 如果超时设置为-1,则禁用重启检查
if timeout == -1:
time.sleep(10) # 仍然需要休眠以避免繁忙循环
continue
idle_time = (datetime.now() - last_activity_time).total_seconds()
if idle_time > timeout:
logger.info(f"服务器空闲时间 ({idle_time:.0f}s) 已超过阈值 ({timeout}s)。")
restart_server()
break # 退出循环,因为进程即将被替换
# 每 10 秒检查一次
time.sleep(10)
# --- FastAPI 生命周期事件 ---
@asynccontextmanager
async def lifespan(app: FastAPI):
"""在服务器启动时运行的生命周期函数。"""
global idle_monitor_thread, last_activity_time, main_event_loop, aiohttp_session, DOWNLOAD_SEMAPHORE, MAX_CONCURRENT_DOWNLOADS
main_event_loop = asyncio.get_running_loop() # 获取主事件循环
load_config() # 首先加载配置
# 从配置中读取并发和连接池设置
MAX_CONCURRENT_DOWNLOADS = CONFIG.get("max_concurrent_downloads", 50)
pool_config = CONFIG.get("connection_pool", {})
# 创建优化的全局aiohttp会话
connector = aiohttp.TCPConnector(
ssl=False,
limit=pool_config.get("total_limit", 200), # 增加总连接数
limit_per_host=pool_config.get("per_host_limit", 50), # 每个主机的连接限制
ttl_dns_cache=pool_config.get("dns_cache_ttl", 300), # DNS缓存时间
force_close=False, # 保持连接
enable_cleanup_closed=True, # 自动清理关闭的连接
keepalive_timeout=pool_config.get("keepalive_timeout", 30) # 保活超时
)
# 创建优化的超时配置
timeout_config = CONFIG.get("download_timeout", {})
timeout = aiohttp.ClientTimeout(
total=timeout_config.get("total", 30), # 总超时时间
connect=timeout_config.get("connect", 5), # 连接超时
sock_read=timeout_config.get("sock_read", 10) # 读取超时
)
aiohttp_session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
trust_env=True
)
# 初始化下载信号量
DOWNLOAD_SEMAPHORE = Semaphore(MAX_CONCURRENT_DOWNLOADS)
logger.info(f"全局aiohttp会话已创建(优化配置)")
logger.info(f" - 最大连接数: {pool_config.get('total_limit', 200)}")
logger.info(f" - 每主机连接数: {pool_config.get('per_host_limit', 50)}")
logger.info(f" - 最大并发下载: {MAX_CONCURRENT_DOWNLOADS}")
# 图像自动增强功能已移除(已剥离为独立项目image_enhancer)
# --- 打印当前的操作模式 ---
mode = CONFIG.get("id_updater_last_mode", "direct_chat")
target = CONFIG.get("id_updater_battle_target", "A")
logger.info("="*60)
logger.info(f" 当前操作模式: {mode.upper()}")
if mode == 'battle':
logger.info(f" - Battle 模式目标: Assistant {target}")
logger.info(" (可通过运行 id_updater.py 修改模式)")
logger.info("="*60)
# 添加监控面板信息
logger.info(f"📊 监控面板: http://127.0.0.1:5102/monitor")
logger.info("="*60)
check_for_updates() # 检查程序更新
load_model_map() # 重新启用模型加载
load_model_endpoint_map() # 加载模型端点映射
logger.info("服务器启动完成。等待油猴脚本连接...")
# 检查并显示公告,放在启动信息的最后,使其更显眼
check_and_display_announcement()
# 在模型更新后,标记活动时间的起点
last_activity_time = datetime.now()
# 启动空闲监控线程
if CONFIG.get("enable_idle_restart", False):
idle_monitor_thread = threading.Thread(target=idle_monitor, daemon=True)
idle_monitor_thread.start()
# 启动内存监控任务
asyncio.create_task(memory_monitor())
# 启动新的日志系统
log_system.start()
logger.info("✅ 新的异步日志系统已启动")
yield
# 清理资源
if aiohttp_session:
await aiohttp_session.close()
logger.info("全局aiohttp会话已关闭")
# 停止日志系统并刷新所有缓冲
await log_system.stop()
logger.info("✅ 日志系统已停止并刷新所有缓冲")
logger.info("服务器正在关闭。")
app = FastAPI(lifespan=lifespan)
# --- CORS 中间件配置 ---
# 允许所有来源、所有方法、所有请求头,这对于本地开发工具是安全的。
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- 辅助函数 ---
def save_config():
"""将当前的 CONFIG 对象写回 config.jsonc 文件,保留注释。"""
try:
# 读取原始文件以保留注释等
with open('config.jsonc', 'r', encoding='utf-8') as f:
lines = f.readlines()
# 使用正则表达式安全地替换值
def replacer(key, value, content):
# 这个正则表达式会找到 key,然后匹配它的 value 部分,直到逗号或右花括号
pattern = re.compile(rf'("{key}"\s*:\s*").*?("?)(,?\s*)$', re.MULTILINE)
replacement = rf'\g<1>{value}\g<2>\g<3>'
if not pattern.search(content): # 如果 key 不存在,就添加到文件末尾(简化处理)
content = re.sub(r'}\s*$', f' ,"{key}": "{value}"\n}}', content)
else:
content = pattern.sub(replacement, content)
return content
content_str = "".join(lines)
content_str = replacer("session_id", CONFIG["session_id"], content_str)
content_str = replacer("message_id", CONFIG["message_id"], content_str)
with open('config.jsonc', 'w', encoding='utf-8') as f:
f.write(content_str)
logger.info("✅ 成功将会话信息更新到 config.jsonc。")
except Exception as e:
logger.error(f"❌ 写入 config.jsonc 时发生错误: {e}", exc_info=True)
async def _process_openai_message(message: dict) -> dict:
"""
处理OpenAI消息,分离文本和附件。
- 将多模态内容列表分解为纯文本和附件列表。
- 文件床逻辑已移至 chat_completions 预处理,此处仅处理常规附件构建。
- 确保 user 角色的空内容被替换为空格,以避免 LMArena 出错。
- 特殊处理assistant角色的图片:检测Markdown图片并转换为experimental_attachments
"""
content = message.get("content")
role = message.get("role")
attachments = []
experimental_attachments = []
text_content = ""
# 添加诊断日志
logger.debug(f"[MSG_PROCESS] 处理消息 - 角色: {role}, 内容类型: {type(content).__name__}")
# 特殊处理assistant角色的字符串内容中的Markdown图片
if role == "assistant" and isinstance(content, str):
import re
# 匹配  格式的Markdown图片
markdown_pattern = r'!\[([^\]]*)\]\(([^)]+)\)'
matches = re.findall(markdown_pattern, content)
if matches:
logger.info(f"[MSG_PROCESS] 在assistant消息中检测到 {len(matches)} 个Markdown图片")
# 移除Markdown图片,只保留文本
text_content = re.sub(markdown_pattern, '', content).strip()
# 将图片转换为experimental_attachments格式
for alt_text, url in matches:
# 确定内容类型
if url.startswith("data:"):
# base64格式
content_type = url.split(';')[0].split(':')[1] if ':' in url else 'image/png'
elif url.startswith("http"):
# HTTP URL
content_type = mimetypes.guess_type(url)[0] or 'image/jpeg'
else:
content_type = 'image/jpeg'
# 生成文件名
if '/' in url and not url.startswith("data:"):
# 从URL提取文件名
filename = url.split('/')[-1].split('?')[0]
if '.' not in filename:
filename = f"image_{uuid.uuid4()}.{content_type.split('/')[-1]}"
else:
filename = f"image_{uuid.uuid4()}.{content_type.split('/')[-1]}"
experimental_attachment = {
"name": filename,
"contentType": content_type,
"url": url
}
experimental_attachments.append(experimental_attachment)
logger.debug(f"[MSG_PROCESS] 添加experimental_attachment: {filename}")
else:
text_content = content
elif isinstance(content, list):
text_parts = []
for part in content:
if part.get("type") == "text":
text_parts.append(part.get("text", ""))
elif part.get("type") == "image_url":
# 此处的 URL 可能是 base64 或 http URL (已被预处理器替换)
image_url_data = part.get("image_url", {})
url = image_url_data.get("url")
original_filename = image_url_data.get("detail")
try:
# 对于 base64,我们需要提取 content_type
if url.startswith("data:"):
content_type = url.split(';')[0].split(':')[1]
else:
# 对于 http URL,我们尝试猜测 content_type
content_type = mimetypes.guess_type(url)[0] or 'application/octet-stream'
file_name = original_filename or f"image_{uuid.uuid4()}.{mimetypes.guess_extension(content_type).lstrip('.') or 'png'}"
attachment = {
"name": file_name,
"contentType": content_type,
"url": url
}
# Assistant角色使用experimental_attachments
if role == "assistant":
experimental_attachments.append(attachment)
logger.debug(f"[MSG_PROCESS] Assistant图片添加到experimental_attachments")
else:
attachments.append(attachment)
logger.debug(f"[MSG_PROCESS] {role}图片添加到attachments")
except (AttributeError, IndexError, ValueError) as e:
logger.warning(f"处理附件URL时出错: {url[:100]}... 错误: {e}")
text_content = "\n\n".join(text_parts)
elif isinstance(content, str):
text_content = content
if role == "user" and not text_content.strip():
text_content = " "
# 构建返回结果
result = {
"role": role,
"content": text_content,
"attachments": attachments
}
# Assistant角色添加experimental_attachments
if role == "assistant" and experimental_attachments:
result["experimental_attachments"] = experimental_attachments
logger.info(f"[MSG_PROCESS] Assistant消息包含 {len(experimental_attachments)} 个experimental_attachments")
return result
async def convert_openai_to_lmarena_payload(openai_data: dict, session_id: str, message_id: str, mode_override: str = None, battle_target_override: str = None) -> dict:
"""
将 OpenAI 请求体转换为油猴脚本所需的简化载荷,并应用酒馆模式、绕过模式以及对战模式。
新增了模式覆盖参数,以支持模型特定的会话模式。
"""
# 0. 预处理:从历史消息中剥离思维链(如果配置启用)
messages = openai_data.get("messages", [])
if CONFIG.get("strip_reasoning_from_history", True) and CONFIG.get("enable_lmarena_reasoning", False):
reasoning_mode = CONFIG.get("reasoning_output_mode", "openai")
# 仅对think_tag模式有效(OpenAI模式的reasoning_content不在content中)
if reasoning_mode == "think_tag":
import re
think_pattern = re.compile(r'<think>.*?</think>\s*', re.DOTALL)
for msg in messages:
if msg.get("role") == "assistant" and isinstance(msg.get("content"), str):
original_content = msg["content"]
# 移除<think>标签及其内容
cleaned_content = think_pattern.sub('', original_content).strip()
if cleaned_content != original_content:
msg["content"] = cleaned_content
logger.debug(f"[REASONING_STRIP] 从历史消息中剥离了思维链内容")
# 1. 规范化角色并处理消息
# - 将非标准的 'developer' 角色转换为 'system' 以提高兼容性。
# - 分离文本和附件。
for msg in messages:
if msg.get("role") == "developer":
msg["role"] = "system"
logger.info("消息角色规范化:将 'developer' 转换为 'system'。")
processed_messages = []
for msg in messages:
processed_msg = await _process_openai_message(msg.copy())
processed_messages.append(processed_msg)
# 2. 应用酒馆模式 (Tavern Mode)
if CONFIG.get("tavern_mode_enabled"):
system_prompts = [msg['content'] for msg in processed_messages if msg['role'] == 'system']
other_messages = [msg for msg in processed_messages if msg['role'] != 'system']
merged_system_prompt = "\n\n".join(system_prompts)
final_messages = []
if merged_system_prompt:
# 系统消息不应有附件
final_messages.append({"role": "system", "content": merged_system_prompt, "attachments": []})
final_messages.extend(other_messages)
processed_messages = final_messages
# 3. 确定目标模型 ID 和类型
model_name = openai_data.get("model", "claude-3-5-sonnet-20241022")
# 优先从 MODEL_ENDPOINT_MAP 获取模型类型(如果定义了)
model_type = "text" # 默认类型
endpoint_info = MODEL_ENDPOINT_MAP.get(model_name, {})
# 诊断日志:记录模型类型判断过程
logger.info(f"[BYPASS_DEBUG] 开始判断模型 '{model_name}' 的类型...")
logger.info(f"[BYPASS_DEBUG] endpoint_info 类型: {type(endpoint_info).__name__}, 内容: {endpoint_info}")
if isinstance(endpoint_info, dict) and "type" in endpoint_info:
model_type = endpoint_info.get("type", "text")
logger.info(f"[BYPASS_DEBUG] 从 model_endpoint_map.json (dict) 获取模型类型: {model_type}")
elif isinstance(endpoint_info, list) and endpoint_info:
# 如果是列表格式,取第一个元素的类型
first_endpoint = endpoint_info[0] if isinstance(endpoint_info[0], dict) else {}
if "type" in first_endpoint:
model_type = first_endpoint.get("type", "text")
logger.info(f"[BYPASS_DEBUG] 从 model_endpoint_map.json (list) 获取模型类型: {model_type}")
# 回退到 models.json 中的定义
model_info = MODEL_NAME_TO_ID_MAP.get(model_name, {}) # 关键修复:确保 model_info 总是一个字典
if not endpoint_info.get("type") and model_info:
old_type = model_type
model_type = model_info.get("type", "text")
logger.info(f"[BYPASS_DEBUG] 从 models.json 获取模型类型: {old_type} -> {model_type}")
logger.info(f"[BYPASS_DEBUG] 最终确定的模型类型: {model_type}")
target_model_id = None
if model_info:
target_model_id = model_info.get("id")
else:
logger.warning(f"模型 '{model_name}' 在 'models.json' 中未找到。请求将不带特定模型ID发送。")
if not target_model_id:
logger.warning(f"模型 '{model_name}' 在 'models.json' 中未找到对应的ID。请求将不带特定模型ID发送。")
# 4. 构建消息模板
message_templates = []
for msg in processed_messages:
msg_template = {
"role": msg["role"],
"content": msg.get("content", ""),
"attachments": msg.get("attachments", [])
}
# 对于user角色,附件需要放在experimental_attachments中
if msg["role"] == "user" and msg.get("attachments"):
msg_template["experimental_attachments"] = msg.get("attachments", [])
logger.info(f"[LMARENA_CONVERT] 将user的 {len(msg['attachments'])} 个附件添加到experimental_attachments")
# 保留assistant的experimental_attachments字段(图片生成模型需要)
if msg["role"] == "assistant" and "experimental_attachments" in msg:
msg_template["experimental_attachments"] = msg["experimental_attachments"]
logger.info(f"[LMARENA_CONVERT] 保留assistant的 {len(msg['experimental_attachments'])} 个experimental_attachments")
message_templates.append(msg_template)
# 4.5 应用图片附件审查绕过 (Image Attachment Bypass) - 专用于image模型
# 当使用image模型且最新的用户请求包含图片附件时,将文本内容分离到新请求中
# 注:text模型有自己的绕过机制,search模型不需要(空内容会报错)
if CONFIG.get("image_attachment_bypass_enabled", False) and model_type == "image":
# 查找最后一条用户消息
last_user_msg_idx = None
for i in range(len(message_templates) - 1, -1, -1):
if message_templates[i]["role"] == "user":
last_user_msg_idx = i
break
if last_user_msg_idx is not None:
last_user_msg = message_templates[last_user_msg_idx]
# 检查是否包含图片附件
has_image_attachment = False
if last_user_msg.get("attachments"):
for attachment in last_user_msg["attachments"]:
if attachment.get("contentType", "").startswith("image/"):
has_image_attachment = True
break
# 如果包含图片附件且有文本内容,执行分离
if has_image_attachment and last_user_msg.get("content", "").strip():
original_content = last_user_msg["content"]
original_attachments = last_user_msg["attachments"]
# 创建两条消息:
# 第一条:只包含图片附件(成为历史记录)
image_only_msg = {
"role": "user",
"content": " ", # 空内容或空格
"experimental_attachments": original_attachments,
"attachments": original_attachments
}
# 第二条:只包含文本内容(作为最新请求)
text_only_msg = {
"role": "user",
"content": original_content,
"attachments": []
}
# 替换原消息为两条分离的消息
message_templates[last_user_msg_idx] = image_only_msg
message_templates.insert(last_user_msg_idx + 1, text_only_msg)
logger.info(f"图片模型审查绕过已启用:将包含 {len(original_attachments)} 个附件的请求分离为两条消息")
# 5. 应用绕过模式 (Bypass Mode) - 根据模型类型和配置决定是否启用
# 获取细粒度的绕过设置
bypass_settings = CONFIG.get("bypass_settings", {})
global_bypass_enabled = CONFIG.get("bypass_enabled", False)
# 诊断日志:详细记录绕过决策过程
logger.info(f"[BYPASS_DEBUG] ===== 绕过决策开始 =====")
logger.info(f"[BYPASS_DEBUG] 全局 bypass_enabled: {global_bypass_enabled}")
logger.info(f"[BYPASS_DEBUG] bypass_settings: {bypass_settings}")
logger.info(f"[BYPASS_DEBUG] 当前模型类型: {model_type}")
# 根据模型类型确定是否启用绕过
bypass_enabled_for_type = False
# 修复:全局bypass_enabled为False时,无论bypass_settings如何设置都应该禁用
if not global_bypass_enabled:
bypass_enabled_for_type = False
logger.info(f"[BYPASS_DEBUG] ⛔ 全局 bypass_enabled=False,强制禁用所有绕过功能")
elif bypass_settings:
# 如果有细粒度配置,检查是否明确定义了该类型
if model_type in bypass_settings:
# 如果明确定义了,使用定义的值(但仍受全局开关控制)
bypass_enabled_for_type = bypass_settings.get(model_type, False)
logger.info(f"[BYPASS_DEBUG] 使用 bypass_settings 中明确定义的值: bypass_settings['{model_type}'] = {bypass_enabled_for_type}")
else:
# 如果未明确定义,默认为False(更安全的默认值)
bypass_enabled_for_type = False
logger.info(f"[BYPASS_DEBUG] model_type '{model_type}' 未在 bypass_settings 中定义,默认禁用")
else:
# 如果没有细粒度配置,使用全局设置(保持向后兼容)
# 但对于 image 和 search 类型,默认为 False(保持原有行为)
if model_type in ["image", "search"]:
bypass_enabled_for_type = False
logger.info(f"[BYPASS_DEBUG] 无 bypass_settings,模型类型 '{model_type}' 属于 ['image', 'search'],强制设为 False")
else:
bypass_enabled_for_type = global_bypass_enabled
logger.info(f"[BYPASS_DEBUG] 无 bypass_settings,使用全局 bypass_enabled: {bypass_enabled_for_type}")
logger.info(f"[BYPASS_DEBUG] 最终决策:bypass_enabled_for_type = {bypass_enabled_for_type}")
if bypass_enabled_for_type:
# 从配置中读取绕过注入内容
bypass_injection = CONFIG.get("bypass_injection", {})
# 支持预设模式
bypass_presets = bypass_injection.get("presets", {})
active_preset_name = bypass_injection.get("active_preset", "default")
# 尝试获取激活的预设
injection_config = bypass_presets.get(active_preset_name)
# 如果预设不存在,回退到自定义配置或默认值
if not injection_config:
logger.warning(f"[BYPASS_DEBUG] 预设 '{active_preset_name}' 不存在,使用自定义配置")
injection_config = bypass_injection.get("custom", {
"role": "user",
"content": " ",
"participantPosition": "a"
})
# 获取注入参数(带默认值)
inject_role = injection_config.get("role", "user")
inject_content = injection_config.get("content", " ")
inject_position = injection_config.get("participantPosition", "a")
logger.info(f"[BYPASS_DEBUG] ⚠️ 模型类型 '{model_type}' 的绕过模式已启用")
logger.info(f"[BYPASS_DEBUG] - 使用预设: {active_preset_name}")
logger.info(f"[BYPASS_DEBUG] - 注入角色: {inject_role}")
logger.info(f"[BYPASS_DEBUG] - 注入位置: {inject_position}")
logger.info(f"[BYPASS_DEBUG] - 注入内容: {inject_content[:50]}{'...' if len(inject_content) > 50 else ''}")
message_templates.append({
"role": inject_role,
"content": inject_content,
"participantPosition": inject_position,
"attachments": []
})
else:
if global_bypass_enabled or any(bypass_settings.values()) if bypass_settings else False:
# 如果有任何绕过设置启用,但当前类型未启用,记录日志
logger.info(f"[BYPASS_DEBUG] ✅ 模型类型 '{model_type}' 的绕过模式已禁用。")
logger.info(f"[BYPASS_DEBUG] ===== 绕过决策结束 =====")
# 6. 应用参与者位置 (Participant Position)
# 优先使用覆盖的模式,否则回退到全局配置
mode = mode_override or CONFIG.get("id_updater_last_mode", "direct_chat")
target_participant = battle_target_override or CONFIG.get("id_updater_battle_target", "A")
target_participant = target_participant.lower() # 确保是小写
logger.info(f"正在根据模式 '{mode}' (目标: {target_participant if mode == 'battle' else 'N/A'}) 设置 Participant Positions...")