diff --git a/runner/README.md b/runner/README.md index 7eb3491..2fad316 100644 --- a/runner/README.md +++ b/runner/README.md @@ -81,6 +81,20 @@ python -m runner.mobiagent.mobiagent --service_ip <服务IP> --decider_port <决 - `--data_dir `:结果数据保存目录,默认为脚本目录下的 `data/`(若不存在会自动创建)。 - `--task_file `:任务列表文件路径,默认为脚本目录下的 `task.json`。 +### 基于经验模板的 AgentRR(实验性功能) + +(这是一个实验性功能,可能导致任务执行出错或不稳定) + +要启用基于经验模板的AgentRR(通过经验模板提供的子任务拆分来回放历史操作,从而加速任务执行),需要同时设置 `--use_experience` 和 `--enable_agentrr`: + +```bash +python -m runner.mobiagent.mobiagent \ + ... + --use_experience \ + --enable_agentrr +``` + +在执行 `task.json` 中指定的任务时,AgentRR 将搜索之前已完成任务中的可重用操作,并回放它们,从而绕过 agent 模型调用。 ### 用户画像与偏好记忆(Mem0/GraphRAG) diff --git a/runner/README_en.md b/runner/README_en.md index 2286cb1..1dfa2bd 100644 --- a/runner/README_en.md +++ b/runner/README_en.md @@ -73,6 +73,22 @@ Parameters - `--grounder_port`: Grounder service port (default: `8001`) - `--planner_port`: Planner service port (default: `8002`) +### Experience Template-based AgentRR (experimental) + +(This is a experimental feature which can be unstable) + +To enable experience template-based Agent Record & Replay, which accelerates task execution by replaying historical actions based on the subtask splitting provided by experience templates, set both `--use_experience` and `--enable_agentrr`: + +```bash +python -m runner.mobiagent.mobiagent \ + ... + --use_experience \ + --enable_agentrr +``` + +When executing a task in `task.json`, AgentRR will search for reuseable actions in previously completed tasks and replay them, bypassing agent model invocation. + + ### User Profile & Preference Memory (Mem0/GraphRAG) MobiAgent integrates a user preference memory system (Mem0) to provide personalized context for planning. diff --git a/runner/mobiagent/agent_rr.py b/runner/mobiagent/agent_rr.py new file mode 100644 index 0000000..0a782ea --- /dev/null +++ b/runner/mobiagent/agent_rr.py @@ -0,0 +1,529 @@ +from collections import defaultdict +import itertools +from pathlib import Path +import random +from unittest import result +from pydantic import BaseModel +from typing import Any, Optional, Sequence, Union +from openai import OpenAI +import json, logging, os +import hashlib +import cv2 +import numpy as np +from PIL import Image +from utils.load_md_prompt import load_prompt +from utils.local_experience import PromptTemplateSearch +import pandas as pd +from skimage.metrics import structural_similarity + +logger = logging.getLogger(__name__) +logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper()) + +BINDING_PROMPT = """ +## 角色定义 + +你是一个经验重用系统智能体,负责将一个手机使用任务的各个子任务和相应动作序列进行绑定。 + +## 输入格式 + +输入包含一个任务拆分的子任务序列,以及这个任务实际执行的动作序列。 + +一个子任务是一个高层次的自然语言描述;一个动作是实际可在手机上执行的低层次描述,为一个JSON对象,包含以下字段: + +- `reasoning`: 产生该动作的推理过程描述。 +- `action`: 动作类别,可选项:`click`, `input`, `scroll`, `wait`, `done`。 +- `parameters`: 动作所需的参数,例如点击位置、输入内容等。 + +一个子任务可能对应动作序列中的一到多步操作,你需要为每个子任务找到对应的动作序列范围。 + +## 输出格式 + +请**按照接下来给定的子任务顺序**输出一个JSON列表,列表中的每一项代表一个绑定关系,包含以下字段: + +- `subtask`:重复一遍子任务的自然语言描述,确保你正确理解了子任务内容,并能够选择正确的索引。 +- `subtask_index`: 子任务在子任务序列中的索引,索引从1开始,每个子任务的索引已经在输入中给出。 +- `start_action`:重复一遍起始动作,确保你能够正确选择起始动作的索引。 +- `start_action_index`: 起始动作在动作序列中的索引,索引从1开始。 +- `end_action`:重复一遍结束动作,确保你能够正确选择结束动作的索引。 +- `end_action_index`: 结束动作在动作序列中的索引,索引从1开始。 + +## 子任务序列 + +{subtasks} + +## 实际动作序列 + +{actions} + +## 要求 + +你的输出需要严格遵守以下要求: + +1. 确保上一个子任务的结束动作索引+1等于下一个子任务的起始动作索引 +2. 确保索引从1开始,从最后一个子任务/动作的索引结束 +3. 确保按照输入顺序输出所有子任务 +4. 确保每个子任务至少对应一个动作 +""" + +GENERATE_TEXT_PROMPT = """根据以下描述生成一个随机文本,只返回生成的文本内容,不要包含任何解释、说明或其他额外信息: + +{description}""" + +GENERATE_TEXT_WITH_HISTORY_PROMPT = """根据以下描述生成一个随机文本,只返回生成的文本内容,不要包含任何解释、说明或其他额外信息: + +描述:{description} + +重要要求:生成的值**必须**与以下值**不重复**: +{history_text} + +请确保生成一个全新的、与上述值都不相同的文本。""" + + +""" +## 示例 + +假设有以下子任务序列: + +1. 进入旅游应用的火车票分区 +2. 输入目的地为“北京” +3. 执行搜索 + +对应的动作序列为: + +1. {{"reasoning": "我需要点击屏幕左上角的火车票按钮,进入火车票分区", "action": "click", "parameters": {{"target_element": "火车票按钮"}}}} +2. {{"reasoning": "我需要点击目的地输入框,以便开始输入我的目的地", "action": "click", "parameters": {{"target_element": "目的地输入框"}}}} +3. {{"reasoning": "我需要输入目的地“北京”", "action": "input", "parameters": {{ "text": "北京"}}}} +4. {{"reasoning": "我需要点击搜索按钮,执行搜索操作", "action": "click", "parameters": {{"target_element": "搜索按钮"}}}} + +那么正确的输出应该是: + +[ + {{ + "subtask": "进入旅游应用的火车票分区", + "subtask_index": 1, + "start_action": {{"reasoning": "我需要点击屏幕左上角的火车票按钮,进入火车票分区", "action": "click", "parameters": {{"target_element": "火车票按钮"}}}}, + "start_action_index": 1, + "end_action": {{"reasoning": "我需要点击屏幕左上角的火车票按钮,进入火车票分区", "action": "click", "parameters": {{"target_element": "火车票按钮"}}}}, + "end_action_index": 1 + }}, + {{ + "subtask": "输入目的地为“北京”", + "subtask_index": 2, + "start_action": {{"reasoning": "我需要点击目的地输入框,以便开始输入我的目的地", "action": "click", "parameters": {{"target_element": "目的地输入框"}}}}, + "start_action_index": 2, + "end_action": {{"reasoning": "我需要输入目的地“北京”", "action": "input", "parameters": {{ "text": "北京"}}}}, + "end_action_index": 3 + }}, + {{ + "subtask": "执行搜索", + "subtask_index": 3, + "start_action": {{"reasoning": "我需要点击搜索按钮,执行搜索操作", "action": "click", "parameters": {{"target_element": "搜索按钮"}}}}, + "start_action_index": 4, + "end_action": {{"reasoning": "我需要点击搜索按钮,执行搜索操作", "action": "click", "parameters": {{"target_element": "搜索按钮"}}}}, + "end_action_index": 4 + }} +] +""" + +class MidLevelSequence(BaseModel): + template_hash: str + sequence: list[str] + + @classmethod + def from_experience(cls, final_desc: str, template: str) -> "MidLevelSequence": + template_hash = hashlib.md5(template.encode('utf-8')).hexdigest() + lines = final_desc.strip().split("\n") + idx = 1 + sequence = [] + for line in lines: + if not line.startswith(f"{idx}."): + continue + subtask = line[len(f"{idx}."):].strip() + # replace chinese quotes with english quotes + subtask = subtask.replace("“", "\"").replace("”", "\"") + sequence.append(subtask) + idx += 1 + return cls(template_hash=template_hash, sequence=sequence) + + def __str__(self): + # skip first open_app + # filtered_sequence = self.sequence[1:] if len(self.sequence) > 1 else self.sequence + return "\n".join([f"{idx}. {subtask}" for idx, subtask in enumerate(self.sequence, 1)]) + + def __len__(self): + return len(self.sequence) + + def __getitem__(self, index): + return self.sequence[index] + +class MobiAgentAction(BaseModel): + reasoning: str + action: str + parameters: dict[str, Any] + extra_info: Optional[dict[str, Any]] = None + +class LowLevelSequence(BaseModel): + template_hash: str + sequence: list[MobiAgentAction] + + @classmethod + def from_history(cls, template_hash: str, history: list[str], extra_info: list[Optional[dict[str, Any]]]) -> "LowLevelSequence": + sequence = [] + for h, extra in zip(history, extra_info): + h = json.loads(h) + action = MobiAgentAction( + reasoning=h.get("reasoning"), + action=h.get("action"), + parameters=h.get("parameters"), + extra_info=extra, + ) + sequence.append(action) + return cls(template_hash=template_hash, sequence=sequence) + + def __str__(self): + filtered_sequence = self.sequence + return "\n".join([f"{idx}. {action.model_dump_json(exclude={'extra_info'})}" for idx, action in enumerate(filtered_sequence, 1)]) + + def __len__(self): + return len(self.sequence) + + def __getitem__(self, index): + return self.sequence[index] + +class Binding(BaseModel): + template_hash: str + # right exclusive + ranges: list[tuple[int, int]] + +class ReplayInfo(BaseModel): + fisrt_group_replayable: bool + replay_groups: list[list[MobiAgentAction]] + periods: list[tuple[int, int]] = [] + + +class Subtask(BaseModel): + description: str + actions: Optional[list[MobiAgentAction]] = None + + +class BBoxChangeResult(BaseModel): + """ + 表示一次UI元素变化检测的结果。 + """ + + changed: bool + change_ratio: float + mean_intensity_delta: float + ssim_score: Optional[float] = None + + +def _load_image(image: Union[str, Path, "np.ndarray", Image.Image]) -> "np.ndarray": + """ + 支持从路径或已有的numpy数组加载OpenCV图像。 + """ + + if isinstance(image, np.ndarray): + return image + if isinstance(image, Image.Image): + rgb = image.convert("RGB") + return cv2.cvtColor(np.array(rgb), cv2.COLOR_RGB2BGR) + image_path = Path(image) + if not image_path.exists(): + raise FileNotFoundError(f"Screenshot not found: {image_path}") + img = cv2.imread(str(image_path)) + if img is None: + raise ValueError(f"Failed to read image: {image_path}") + return img + + +def _sanitize_bbox(bbox: Sequence[int], width: int, height: int) -> tuple[int, int, int, int]: + if len(bbox) != 4: + raise ValueError(f"bbox must contain four values, got {bbox}") + x1, y1, x2, y2 = map(int, bbox) + if x1 > x2: + x1, x2 = x2, x1 + if y1 > y2: + y1, y2 = y2, y1 + width = max(1, int(width)) + height = max(1, int(height)) + x1 = max(0, min(x1, width - 1)) + y1 = max(0, min(y1, height - 1)) + x2 = max(x1 + 1, min(x2, width)) + y2 = max(y1 + 1, min(y2, height)) + return x1, y1, x2, y2 + + +def detect_bbox_change( + prev_screenshot: Union[str, Path, "np.ndarray", Image.Image], + curr_screenshot: Union[str, Path, "np.ndarray", Image.Image], + bbox: Sequence[int], + *, + ssim_threshold: float = 0.9, + diff_activation: float = 0.15, + blur_kernel_size: int = 3, +) -> BBoxChangeResult: + """ + 使用 SSIM (结构相似度) 判断 bbox 区域是否发生显著变化。 + + Args: + ssim_threshold: SSIM 分数低于该值则判定变化 + diff_activation: SSIM 差分图中视为变化的阈值,范围 0-1 + """ + + prev_img = _load_image(prev_screenshot) + curr_img = _load_image(curr_screenshot) + + if prev_img.shape[:2] != curr_img.shape[:2]: + raise ValueError("Screenshots must share the same resolution for comparison") + + x1, y1, x2, y2 = _sanitize_bbox(bbox, prev_img.shape[1], prev_img.shape[0]) + prev_roi = prev_img[y1:y2, x1:x2] + curr_roi = curr_img[y1:y2, x1:x2] + + if blur_kernel_size and blur_kernel_size > 1: + if blur_kernel_size % 2 == 0: + blur_kernel_size += 1 + prev_roi = cv2.GaussianBlur(prev_roi, (blur_kernel_size, blur_kernel_size), 0) + curr_roi = cv2.GaussianBlur(curr_roi, (blur_kernel_size, blur_kernel_size), 0) + + prev_gray = cv2.cvtColor(prev_roi, cv2.COLOR_BGR2GRAY) + curr_gray = cv2.cvtColor(curr_roi, cv2.COLOR_BGR2GRAY) + + ssim_score, diff_map = structural_similarity(prev_gray, curr_gray, full=True) + diff_map = 1.0 - diff_map # 差异越大数值越大 + change_mask = diff_map >= diff_activation + change_ratio = float(np.mean(change_mask)) + mean_intensity_delta = float(diff_map.mean()) + changed = ssim_score <= ssim_threshold + + return BBoxChangeResult( + changed=changed, + change_ratio=change_ratio, + mean_intensity_delta=mean_intensity_delta, + ssim_score=ssim_score, + ) + +def append_subtask(subtasks: list[Subtask], new_subtask: Subtask) -> list[Subtask]: + subtasks = subtasks + [new_subtask] + # merge two consecutive non-replayable subtasks + # if len(subtasks) >= 2 and subtasks[-2].actions is None and subtasks[-1].actions is None: + # merged_subtask = Subtask( + # description=subtasks[-2].description.rstrip("。") + ",然后" + subtasks[-1].description, + # actions=None, + # ) + # return subtasks[:-2] + [merged_subtask] + return subtasks + +def convert_subtasks(subtasks: list[Subtask]) -> list[Union[str, list[MobiAgentAction]]]: + if all(subtask.actions is None for subtask in subtasks): + return [] + ret: list[Union[str, list[MobiAgentAction]]] = [] + for subtask in subtasks: + if subtask.actions is None: + ret.append(subtask.description) + else: + ret.extend(subtask.actions) + return ret + +class ExperienceRR: + def __init__(self, planner_client: OpenAI, planner_model: str) -> None: + if not planner_client: + raise ValueError("planner_client is required") + + self.planner_client = planner_client + self.planner_model = planner_model + + self.mid_level_table: dict[str, list[MidLevelSequence]] = defaultdict(list) + self.low_level_table: dict[str, list[LowLevelSequence]] = defaultdict(list) + self.bindings: dict[str, list[Binding]] = defaultdict(list) + + self.subtask_table: dict[str, dict[str, Subtask]] = defaultdict(dict) + self.global_subtask_table: dict[str, Subtask] = {} + + self.query_result_cache: dict[str, list[Subtask]] = {} + + def _update_subtasks(self, template_hash: str, idx: int) -> None: + low_level_seq = self.low_level_table.get(template_hash)[idx] + mid_level_seq = self.mid_level_table.get(template_hash)[idx] + binding = self.bindings.get(template_hash)[idx] + + for i, subtask_desc in enumerate(mid_level_seq.sequence): + range_start, range_end = binding.ranges[i] + actions = low_level_seq.sequence[range_start:range_end] + subtask = Subtask(description=subtask_desc, actions=actions) + + if subtask_desc not in self.subtask_table[template_hash]: + self.subtask_table[template_hash][subtask_desc] = subtask + if subtask_desc not in self.global_subtask_table: + self.global_subtask_table[subtask_desc] = subtask + + def _bind(self, template_hash: str, idx: int) -> None: + assert idx < len(self.bindings[template_hash]), f"Index out of range in binding table for {template_hash}" + low_level_seq = self.low_level_table.get(template_hash)[idx] + mid_level_seq = self.mid_level_table.get(template_hash)[idx] + actions_str = str(low_level_seq) + subtasks_str = str(mid_level_seq) + prompt = BINDING_PROMPT.format(subtasks=subtasks_str, actions=actions_str) + logger.info(f"Binding prompt: {prompt}") + response = self.planner_client.chat.completions.create( + model=self.planner_model, + messages=[ + { + "role": "user", + "content": prompt, + } + ], + ).choices[0].message.content + if response.startswith("```json"): + response = response[len("```json"):].strip() + if response.endswith("```"): + response = response[:-len("```")].strip() + + logger.info(f"Binding response: {response}") + + bindings_json = json.loads(response) + # skip first open_app subtask + # ranges = [(0, 0)] + # cur_subtask_index = 1 + ranges = [] + cur_subtask_index = 0 + for item in bindings_json: + # response_subtask_index = item["subtask_index"] - 1 + 1 # 0-based index, skip first + response_subtask_index = item["subtask_index"] - 1 + if response_subtask_index != cur_subtask_index: + raise ValueError(f"Subtask index mismatch: expected {cur_subtask_index}, got {response_subtask_index}") + start_action_index = item["start_action_index"] - 1 + end_action_index = item["end_action_index"] - 1 + 1 # 0-based index, change to exclusive + if start_action_index < 0 or end_action_index > len(low_level_seq): + raise ValueError(f"Action index out of range: start {start_action_index}, end {end_action_index}, total {len(low_level_seq)}") + if start_action_index >= end_action_index: + raise ValueError(f"Invalid action index range: start {start_action_index}, end {end_action_index}") + ranges.append((start_action_index, end_action_index)) + cur_subtask_index += 1 + + # further validate ranges + # the ranges should cover all actions continuously + if ranges[0][0] != 0: + raise ValueError(f"First subtask should start from action index 0, got {ranges[0][0]}") + if ranges[-1][1] != len(low_level_seq): + raise ValueError(f"Last subtask should end at action index {len(low_level_seq)}, got {ranges[-1][1]}") + for i in range(len(ranges) - 1): + if ranges[i][1] != ranges[i + 1][0]: + raise ValueError(f"Action index ranges are not continuous between subtask {i} and {i+1}: end {ranges[i][1]}, start {ranges[i+1][0]}") + + binding = Binding(template_hash=template_hash, ranges=ranges) + self.bindings[template_hash][idx] = binding + self._update_subtasks(template_hash, idx) + + def record(self, final_desc: str, template: str, history: list[str], extra_info: list[Optional[dict[str, Any]]]) -> None: + mid_level_seq = MidLevelSequence.from_experience(final_desc, template) + # if any(existing == mid_level_seq for existing in self.mid_level_table[mid_level_seq.template_hash]): + # return + self.mid_level_table[mid_level_seq.template_hash].append(mid_level_seq) + low_level_seq = LowLevelSequence.from_history(mid_level_seq.template_hash, history, extra_info) + self.low_level_table[low_level_seq.template_hash].append(low_level_seq) + self.bindings[mid_level_seq.template_hash].append(None) # placeholder + + query_key = f"{final_desc}{template}" + cached_query_result = self.query_result_cache.get(query_key, None) + if cached_query_result: + logger.info("Using cached query result to update subtasks") + cur_start_idx = 0 + ranges = [] + for subtask in cached_query_result: + if subtask.actions is not None: + range_start = cur_start_idx + range_end = cur_start_idx + len(subtask.actions) + ranges.append((range_start, range_end)) + cur_start_idx = range_end + else: + desc = subtask.description + # find range in extra_info + filtered_info = [(i, info) for i, info in enumerate(extra_info) if info and info.get("subtask_desc", None) == desc] + range_start = filtered_info[0][0] + range_end = filtered_info[-1][0] + 1 + ranges.append((range_start, range_end)) + cur_start_idx = range_end + binding = Binding(template_hash=mid_level_seq.template_hash, ranges=ranges) + self.bindings[mid_level_seq.template_hash][-1] = binding + self._update_subtasks(mid_level_seq.template_hash, len(self.mid_level_table[mid_level_seq.template_hash]) - 1) + else: + max_attempts = 3 + for attempt in range(max_attempts): + try: + self._bind(mid_level_seq.template_hash, len(self.mid_level_table[mid_level_seq.template_hash]) - 1) + break + except Exception as e: + logger.error(f"Error processing bindings: {e.__class__.__name__}: {e}") + if attempt == max_attempts - 1: + logger.error(f"Failed to process bindings after {max_attempts} attempts. Recording skipped.") + self.mid_level_table[mid_level_seq.template_hash].pop() + self.low_level_table[low_level_seq.template_hash].pop() + self.bindings[mid_level_seq.template_hash].pop() + + def _query(self, mid_level_seq: MidLevelSequence) -> list[Subtask]: + result: tuple[MidLevelSequence, LowLevelSequence, Binding] = None + max_match_len = 0 + existing_mid_level_seqs = self.mid_level_table.get(mid_level_seq.template_hash, []) + for i, existing_mid_level_seq in enumerate(existing_mid_level_seqs): + if len(existing_mid_level_seq) != len(mid_level_seq): + continue + match_len = 0 + for subtask1, subtask2 in zip(existing_mid_level_seq.sequence, mid_level_seq.sequence): + if subtask1 == subtask2: + match_len += 1 + if match_len > max_match_len: + max_match_len = match_len + result = ( + existing_mid_level_seq, + self.low_level_table[mid_level_seq.template_hash][i], + self.bindings[mid_level_seq.template_hash][i], + ) + + ret_subtasks: list[Subtask] = [] + if result is None: + for subtask_desc in mid_level_seq.sequence: + ret_subtasks = append_subtask(ret_subtasks, Subtask(description=subtask_desc, actions=None)) + return ret_subtasks + + existing_mid_level_seq, low_level_seq, binding = result + + for i, (subtask_desc1, subtask_desc2) in enumerate(zip(existing_mid_level_seq.sequence, mid_level_seq.sequence)): + range_start, range_end = binding.ranges[i] + if subtask_desc1 == subtask_desc2: + subtask = Subtask(description=subtask_desc1, actions=low_level_seq[range_start:range_end]) + else: + subtask = Subtask(description=subtask_desc2, actions=None) + ret_subtasks = append_subtask(ret_subtasks, subtask) + return ret_subtasks + + def _query_cross_task(self, mid_level_seq: MidLevelSequence, enable_cross_template: bool = False) -> list[Subtask]: + ret_subtasks: list[Subtask] = [] + available_subtasks = self.global_subtask_table if enable_cross_template else self.subtask_table.get(mid_level_seq.template_hash, {}) + for subtask_desc in mid_level_seq.sequence: + if subtask_desc in available_subtasks: + ret_subtasks = append_subtask(ret_subtasks, available_subtasks[subtask_desc]) + else: + ret_subtasks = append_subtask(ret_subtasks, Subtask(description=subtask_desc, actions=None)) + return ret_subtasks + + def query(self, + final_desc: str, + template: str, + enable_cross_task: bool = False, + enable_cross_template: bool = False, + convert: bool = True + ) -> list[MobiAgentAction] | list[Union[str, list[MobiAgentAction]]]: + mid_level_seq = MidLevelSequence.from_experience(final_desc, template) + subtasks: list[Subtask] = [] + if enable_cross_task: + subtasks = self._query_cross_task(mid_level_seq, enable_cross_template) + else: + subtasks = self._query(mid_level_seq) + # cache query result + if any(subtask.actions is not None for subtask in subtasks): + cache_key = f"{final_desc}{template}" + self.query_result_cache[cache_key] = subtasks + if convert: + subtasks = convert_subtasks(subtasks) + return subtasks \ No newline at end of file diff --git a/runner/mobiagent/mobiagent.py b/runner/mobiagent/mobiagent.py index 2fd2093..9f870d8 100644 --- a/runner/mobiagent/mobiagent.py +++ b/runner/mobiagent/mobiagent.py @@ -1,3 +1,4 @@ +from typing import Any, Optional, Union from openai import OpenAI import uiautomator2 as u2 import base64 @@ -28,6 +29,11 @@ should_extract_preferences, combine_context ) +from .agent_rr import ( + MobiAgentAction, + ExperienceRR, + Subtask +) # 清除可能已存在的 handlers,避免重复配置 for handler in logging.root.handlers[:]: @@ -266,8 +272,9 @@ def dump_hierarchy(self): # 全局偏好提取器 preference_extractor = None -def init(service_ip, decider_port, grounder_port, planner_port, enable_user_profile=False, use_graphrag=False): - global decider_client, grounder_client, planner_client, general_client, general_model, apps, preference_extractor +experience_rr: ExperienceRR = None +def init(service_ip, decider_port, grounder_port, planner_port, enable_user_profile=False, use_graphrag=False, use_experience_rr=False): + global decider_client, grounder_client, planner_client, general_client, general_model, apps, preference_extractor, experience_rr # 加载环境变量 env_path = Path(__file__).parent / ".env" @@ -291,6 +298,9 @@ def init(service_ip, decider_port, grounder_port, planner_port, enable_user_prof else: preference_extractor = None + if use_experience_rr: + experience_rr = ExperienceRR(planner_client, planner_model) + # 截图缩放比例 factor = 0.5 @@ -527,11 +537,16 @@ def robust_json_loads(s): logging.error(f"原始内容: {s[:300]}...") raise ValueError(f"无法解析 JSON 响应: 响应格式不正确") -def task_in_app(app, old_task, task, device, data_dir, bbox_flag=True, use_qwen3=True, device_type="Android"): +def task_in_app(app, old_task, task, template, device, data_dir, bbox_flag=True, use_qwen3=True, device_type="Android"): history = [] actions = [] reacts = [] + # full history for experience record + # if experience_rr is not enabled, full_history is the same as history + # otherwise, history only contains partial history in current subtask + full_history = [] + if use_qwen3: grounder_prompt_template_bbox = load_prompt("grounder_qwen3_bbox.md") grounder_prompt_template_no_bbox = load_prompt("grounder_qwen3_coordinates.md") @@ -541,10 +556,53 @@ def task_in_app(app, old_task, task, device, data_dir, bbox_flag=True, use_qwen3 grounder_prompt_template_bbox = load_prompt("grounder_bbox.md") grounder_prompt_template_no_bbox = load_prompt("grounder_coordinates.md") decider_prompt_template = load_prompt("decider_v2.md") + + # only for experience rr + # store original task description since `task` can be modified during execution + orig_task = task + replay_info: list[Union[str, list[MobiAgentAction]]] = [] + executing_subtask = False + replay_idx = 0 + rr_enabled = experience_rr and old_task != task and template + if rr_enabled: + logging.info("Finding replayable actions") + replay_info = experience_rr.query(task, template, enable_cross_task=True) + for item in replay_info: + if isinstance(item, str): + logging.info(f"Non-replayable subtask: {item}") + elif isinstance(item, MobiAgentAction): + logging.info(f"Replayable historical action: {item.model_dump_json(exclude={'extra_info'})}") + if not replay_info: + logging.info("No replayable actions found") + + # record extra info list + extra_info: list[Optional[dict[str, Any]]] = [] + while True: if len(actions) >= MAX_STEPS: logging.info("Reached maximum steps, stopping the task.") break + + replay_this_step = False + replay_grounder_bbox = None + if replay_info and replay_idx < len(replay_info) and (not executing_subtask): + replay_item = replay_info[replay_idx] + if isinstance(replay_item, str): + # a subtask + task = replay_item.rstrip("。") + ",然后结束任务,不要进行其他操作。" + executing_subtask = replay_idx != len(replay_info) - 1 # not the last subtask + history.clear() + logging.info(f"The next subtask cannot be replayed: {task}, let agent take it over.") + elif isinstance(replay_item, MobiAgentAction): + replay_this_step = True + action_dict = replay_item.model_dump(exclude={"extra_info"}) + if not action_dict["reasoning"].startswith("Replay:"): + action_dict["reasoning"] = "Replay: " + action_dict["reasoning"] + decider_response_str = json.dumps(action_dict, ensure_ascii=False) + if replay_item.extra_info: + replay_grounder_bbox = replay_item.extra_info.get("bbox", None) + logging.info(f"Replaying historical action for step {len(full_history) + 1}: \n{decider_response_str}") + replay_idx += 1 if len(history) == 0: history_str = "(No history)" @@ -552,168 +610,198 @@ def task_in_app(app, old_task, task, device, data_dir, bbox_flag=True, use_qwen3 history_str = "\n".join(f"{idx}. {h}" for idx, h in enumerate(history, 1)) screenshot_resize = get_screenshot(device, device_type) - - decider_prompt = decider_prompt_template.format( - task=task, - history=history_str - ) - logging.info(f"Decider prompt: \n{decider_prompt}") - - decider_start_time = time.time() - # --- 修改 API 调用 --- - # vLLM 将会强制输出一个符合 ActionPlan 结构的 JSON 字符串 - # 若响应超时或者返回结果解析失败,则进行重试 - temperature = 0.0 - for attempt in range(5): # 最多重试5次 - # while True: - try: - decider_start_time = time.time() - decider_response_str = decider_client.chat.completions.create( - model=decider_model, - messages=[ - { - "role": "user", - "content": [ - {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{screenshot_resize}"}}, - {"type": "text", "text": decider_prompt}, - ] - } - ], - temperature=temperature, - - timeout=30, - max_tokens=256, - ).choices[0].message.content - decider_end_time = time.time() - logging.info(f"[evaluation] Decider time taken: {decider_end_time - decider_start_time} seconds") - logging.info(f"Decider response: \n{decider_response_str}") - decider_response = robust_json_loads(decider_response_str) - converted_item = { - "reasoning": decider_response["reasoning"], - "function": { - "name": decider_response["action"], - "parameters": decider_response["parameters"] - } - } - break # 成功获取响应,跳出重试循环 - except Exception as e: - temperature = 0.1 + attempt * 0.1 # 每次重试时增加温度,增加多样性 - logging.error(f"Decider 调用失败: {e}, 正在重试 temperature={temperature}...") - time.sleep(2) - - - reacts.append(converted_item) - action = decider_response["action"] - - # compute image index for this loop iteration (1-based) - image_index = len(actions) + 1 - current_dir = os.getcwd() - current_image = "" - if device_type == "Android": - img_path = os.path.join(current_dir, f"screenshot-Android.jpg") - save_path = os.path.join(data_dir, f"{image_index}.jpg") - current_image = f"screenshot-Android.jpg" - else: - img_path = os.path.join(current_dir, f"screenshot-Harmony.jpg") - save_path = os.path.join(data_dir, f"{image_index}.jpg") - current_image = f"screenshot-Harmony.jpg" - img = Image.open(img_path) - img.save(save_path) - - # attach index to the most recent react (reasoning) - if reacts: - try: - reacts[-1]["action_index"] = image_index - except Exception: - pass - - # 根据设备类型保存hierarchy - if device_type == "Android": - logging.info("Dumping UI hierarchy...") - hierarchy = device.dump_hierarchy() - # Android设备保存为XML格式 - hierarchy_path = os.path.join(data_dir, f"{image_index}.xml") - with open(hierarchy_path, "w", encoding="utf-8") as f: - f.write(hierarchy) - else: - hierarchy = device.dump_hierarchy() - # Harmony设备保存为JSON格式 - hierarchy_path = os.path.join(data_dir, f"{image_index}.json") - try: - # 尝试将hierarchy解析为JSON(如果已是JSON字符串) - if isinstance(hierarchy, str): - hierarchy_json = json.loads(hierarchy) - else: - hierarchy_json = hierarchy - with open(hierarchy_path, "w", encoding="utf-8") as f: - json.dump(hierarchy_json, f, ensure_ascii=False, indent=2) - except (json.JSONDecodeError, TypeError): - # 如果解析失败,直接保存为字符串 - logging.warning(f"Failed to parse hierarchy as JSON, saving as plain text") - with open(hierarchy_path, "w", encoding="utf-8") as f: - f.write(str(hierarchy)) - - if action == "done": - print("Task completed.") - status = decider_response["parameters"]["status"] - actions.append({ - "type": "done", - "status": status, - "action_index": image_index - }) - break - if action == "click": - reasoning = decider_response["reasoning"] - target_element = decider_response["parameters"]["target_element"] - grounder_prompt = (grounder_prompt_template_bbox if bbox_flag else grounder_prompt_template_no_bbox).format(reasoning=reasoning, description=target_element) - - # 重试5次获取grounder响应,同时调整temperature + if not replay_this_step: + decider_prompt = decider_prompt_template.format( + task=task, + history=history_str + ) + logging.info(f"Decider prompt: \n{decider_prompt}") + + decider_start_time = time.time() + # --- 修改 API 调用 --- + # vLLM 将会强制输出一个符合 ActionPlan 结构的 JSON 字符串 + # 若响应超时或者返回结果解析失败,则进行重试 temperature = 0.0 - for attempt in range(5): + for attempt in range(5): # 最多重试5次 + # while True: try: - grounder_start_time = time.time() - grounder_response_str = grounder_client.chat.completions.create( - model=grounder_model, + decider_start_time = time.time() + decider_response_str = decider_client.chat.completions.create( + model=decider_model, messages=[ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{screenshot_resize}"}}, - {"type": "text", "text": grounder_prompt}, + {"type": "text", "text": decider_prompt}, ] } ], temperature=temperature, + timeout=30, - max_tokens=128, + max_tokens=256, ).choices[0].message.content - grounder_end_time = time.time() - logging.info(f"[evaluation] Grounder time taken: {grounder_end_time - grounder_start_time} seconds") - logging.info(f"Grounder response: \n{grounder_response_str}") - # grounder_response = json.loads(grounder_response_str) - grounder_response = parse_json_response(grounder_response_str) + decider_end_time = time.time() + logging.info(f"[evaluation] Decider time taken: {decider_end_time - decider_start_time} seconds") + logging.info(f"Decider response: \n{decider_response_str}") + decider_response = robust_json_loads(decider_response_str) + converted_item = { + "reasoning": decider_response["reasoning"], + "function": { + "name": decider_response["action"], + "parameters": decider_response["parameters"] + } + } break # 成功获取响应,跳出重试循环 except Exception as e: temperature = 0.1 + attempt * 0.1 # 每次重试时增加温度,增加多样性 - logging.error(f"Grounder 调用失败: {e}, 正在重试 temperature={temperature}...") - time.sleep(2) - - if(bbox_flag): - # 直接尝试获取含有bbox的字段,而不要求完全匹配 - bbox = None - for key in grounder_response: - if key.lower() in ["bbox", "bbox_2d", "bbox-2d", "bbox_2D", "bbox2d", "bbox_2009"]: - bbox = grounder_response[key] - break - + logging.error(f"Decider 调用失败: {e}, 正在重试 temperature={temperature}...") - # 如果使用 Qwen3 模型,进行坐标转换 - if use_qwen3: - bbox = convert_qwen3_coordinates_to_absolute(bbox, img.width, img.height, is_bbox=True) - x1, y1, x2, y2 = bbox - else: - x1, y1, x2, y2 = [int(coord/factor) for coord in bbox] + else: + decider_response = robust_json_loads(decider_response_str) + converted_item = { + "reasoning": decider_response["reasoning"], + "function": { + "name": decider_response["action"], + "parameters": decider_response["parameters"] + } + } + + action = decider_response["action"] + # ignore `done` action of subtasks in persistant execution logs and full_history + if not (executing_subtask and action == "done"): + reacts.append(converted_item) + + # compute image index for this loop iteration (1-based) + image_index = len(actions) + 1 + current_dir = os.getcwd() + current_image = "" + if device_type == "Android": + img_path = os.path.join(current_dir, f"screenshot-Android.jpg") + save_path = os.path.join(data_dir, f"{image_index}.jpg") + current_image = f"screenshot-Android.jpg" + else: + img_path = os.path.join(current_dir, f"screenshot-Harmony.jpg") + save_path = os.path.join(data_dir, f"{image_index}.jpg") + current_image = f"screenshot-Harmony.jpg" + img = Image.open(img_path) + img.save(save_path) + + # attach index to the most recent react (reasoning) + if reacts: + try: + reacts[-1]["action_index"] = image_index + except Exception: + pass + + # 根据设备类型保存hierarchy + if device_type == "Android": + logging.info("Dumping UI hierarchy...") + hierarchy = device.dump_hierarchy() + # Android设备保存为XML格式 + hierarchy_path = os.path.join(data_dir, f"{image_index}.xml") + with open(hierarchy_path, "w", encoding="utf-8") as f: + f.write(hierarchy) + else: + hierarchy = device.dump_hierarchy() + # Harmony设备保存为JSON格式 + hierarchy_path = os.path.join(data_dir, f"{image_index}.json") + try: + # 尝试将hierarchy解析为JSON(如果已是JSON字符串) + if isinstance(hierarchy, str): + hierarchy_json = json.loads(hierarchy) + else: + hierarchy_json = hierarchy + with open(hierarchy_path, "w", encoding="utf-8") as f: + json.dump(hierarchy_json, f, ensure_ascii=False, indent=2) + except (json.JSONDecodeError, TypeError): + # 如果解析失败,直接保存为字符串 + logging.warning(f"Failed to parse hierarchy as JSON, saving as plain text") + with open(hierarchy_path, "w", encoding="utf-8") as f: + f.write(str(hierarchy)) + full_history.append(decider_response_str) + if rr_enabled: + extra_info.append(None) + if executing_subtask: + subtask = replay_info[replay_idx - 1] + extra_info[-1] = {"subtask_desc": subtask} + + if action == "done": + if replay_info and executing_subtask: + logging.info(f"Subtask '{task}' completed.") + history.clear() + executing_subtask = False + else: + logging.info("Task completed.") + actions.append({ + "type": "done", + "action_index": image_index + }) + break + if action == "click": + if replay_grounder_bbox is None: + reasoning = decider_response["reasoning"] + target_element = decider_response["parameters"]["target_element"] + grounder_prompt = (grounder_prompt_template_bbox if bbox_flag else grounder_prompt_template_no_bbox).format(reasoning=reasoning, description=target_element) + + # 重试5次获取grounder响应,同时调整temperature + temperature = 0.0 + for attempt in range(5): + try: + grounder_start_time = time.time() + grounder_response_str = grounder_client.chat.completions.create( + model=grounder_model, + messages=[ + { + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{screenshot_resize}"}}, + {"type": "text", "text": grounder_prompt}, + ] + } + ], + temperature=temperature, + timeout=30, + max_tokens=128, + ).choices[0].message.content + grounder_end_time = time.time() + logging.info(f"[evaluation] Grounder time taken: {grounder_end_time - grounder_start_time} seconds") + logging.info(f"Grounder response: \n{grounder_response_str}") + # grounder_response = json.loads(grounder_response_str) + grounder_response = parse_json_response(grounder_response_str) + break # 成功获取响应,跳出重试循环 + except Exception as e: + temperature = 0.1 + attempt * 0.1 # 每次重试时增加温度,增加多样性 + logging.error(f"Grounder 调用失败: {e}, 正在重试 temperature={temperature}...") + + if bbox_flag: + if replay_grounder_bbox is None: + # 直接尝试获取含有bbox的字段,而不要求完全匹配 + bbox = None + for key in grounder_response: + if key.lower() in ["bbox", "bbox_2d", "bbox-2d", "bbox_2D", "bbox2d", "bbox_2009"]: + bbox = grounder_response[key] + break + + + # 如果使用 Qwen3 模型,进行坐标转换 + if use_qwen3: + bbox = convert_qwen3_coordinates_to_absolute(bbox, img.width, img.height, is_bbox=True) + x1, y1, x2, y2 = bbox + else: + x1, y1, x2, y2 = [int(coord/factor) for coord in bbox] + else: + logging.info(f"Using replayed grounder bbox: {replay_grounder_bbox}") + x1, y1, x2, y2 = replay_grounder_bbox + + # record click bbox in extra info for experience rr + if rr_enabled: + if extra_info[-1] is not None: + extra_info[-1]["bbox"] = [x1, y1, x2, y2] + else: + extra_info[-1] = {"bbox": [x1, y1, x2, y2]} print(f"Clicking on bbox: [{x1}, {y1}, {x2}, {y2}]") print(f"Image size: width={img.width}, height={img.height}") @@ -791,26 +879,7 @@ def task_in_app(app, old_task, task, device, data_dir, bbox_flag=True, use_qwen3 direction = decider_response["parameters"]["direction"] direction = direction.upper() - if direction == "DOWN": - device.swipe(direction.lower(), 0.4) - # record the swipe as an action (index only) - actions.append({ - "type": "swipe", - "press_position_x": None, - "press_position_y": None, - "release_position_x": None, - "release_position_y": None, - "direction": direction.lower(), - "action_index": image_index - }) - - history.append(decider_response_str) - - # 为向下滑动创建可视化 - create_swipe_visualization(data_dir, image_index, direction.lower()) - continue - - if direction in ["UP", "LEFT", "RIGHT"]: + if direction in ["UP", "LEFT", "RIGHT", "DOWN"]: device.swipe(direction.lower(), 0.4) actions.append({ "type": "swipe", @@ -839,6 +908,8 @@ def task_in_app(app, old_task, task, device, data_dir, bbox_flag=True, use_qwen3 else: raise ValueError(f"Unknown action: {action}") + # always restore task description + task = orig_task data = { "app_name": app, @@ -865,6 +936,11 @@ def task_in_app(app, old_task, task, device, data_dir, bbox_flag=True, use_qwen3 preference_extractor.extract_async(task_data) logging.info("Submitted preference extraction task") + if rr_enabled: + logging.info("Recording experience for future replay") + experience_rr.record(task, template, full_history, extra_info) + logging.info("Experience recorded") + def parse_planner_response(response_str: str): @@ -939,7 +1015,7 @@ def get_app_package_name(task_description, use_graphrag=False, device_type="Andr app_name = response_json.get("app_name") package_name = response_json.get("package_name") final_desc = response_json.get("final_task_description", task_description) - return app_name, package_name, final_desc + return app_name, package_name, final_desc, experience_content def execute_single_task(task_description, device, data_dir, use_experience, use_graphrag, current_device_type, use_qwen3_model): @@ -957,7 +1033,7 @@ def execute_single_task(task_description, device, data_dir, use_experience, use_ """ # 调用 planner 获取应用名称和包名 logging.info(f"Calling planner to get app_name and package_name") - app_name, package_name, planner_task_description = get_app_package_name( + app_name, package_name, planner_task_description, template = get_app_package_name( task_description, use_graphrag=use_graphrag, device_type=current_device_type, use_experience=use_experience ) @@ -972,7 +1048,7 @@ def execute_single_task(task_description, device, data_dir, use_experience, use_ logging.info(f"Starting task in app: {app_name} (package: {package_name})") device.app_start(package_name) - task_in_app(app_name, task_description, new_task_description, device, data_dir, True, use_qwen3_model, current_device_type) + task_in_app(app_name, task_description, new_task_description, template, device, data_dir, True, use_qwen3_model, current_device_type) logging.info(f"Stopping app: {app_name} (package: {package_name})") device.app_stop(package_name) @@ -993,13 +1069,21 @@ def execute_single_task(task_description, device, data_dir, use_experience, use_ parser.add_argument("--use_experience", action="store_true", default=False, help="Whether to use experience (use planner for task rewriting) (default: False)") parser.add_argument("--data_dir", type=str, default=None, help="Directory to save data (default: ./data relative to script location)") parser.add_argument("--task_file", type=str, default=None, help="Path to task.json file (default: ./task.json relative to script location)") + parser.add_argument("--enable_agentrr", action="store_true", default=False, help="Whether to enable Agent Record & Replay (experimental, default: False)") args = parser.parse_args() + use_experience_rr = args.enable_agentrr + if use_experience_rr and (not args.use_experience): + logging.warning("use_experience_rr is enabled but use_experience is disabled; disabling use_experience_rr.") + use_experience_rr = False + if use_experience_rr: + logging.warning("Experimental feature AgentRR is enabled.") + # 使用命令行参数初始化 enable_user_profile = (args.user_profile == "on") use_graphrag = (args.use_graphrag == "on") init(args.service_ip, args.decider_port, args.grounder_port, args.planner_port, - enable_user_profile=enable_user_profile, use_graphrag=use_graphrag) + enable_user_profile=enable_user_profile, use_graphrag=use_graphrag, use_experience_rr=use_experience_rr) # 如果需要清除记忆,优先执行并退出 if args.clear_memory: