Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
.env/*
build/*
dist/*
qt4a/*
version.py
version_file.txt
*.spec
*.pyc
*.log
*.png
.env/*
build/*
dist/*
qt4a/*
version.py
version_file.txt
*.spec
*.pyc
*.log
*.png
.DS_Store
.vscode
.github
164 changes: 164 additions & 0 deletions manager/controlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
from .activitymanager import ActivityManager
from .windowmanager import WindowManager
from utils.logger import Log
from qt4f.driver import FlutterDriver
from qt4f.finder import FlutterFinder
from qt4f.qpath import QPath


class EnumWebViewType(object):
Expand All @@ -46,6 +49,7 @@ def __init__(self, device):
self._activity_manager = ActivityManager.get_instance(device)
self._window_manager = WindowManager.get_instance(device)
self._driver_dict = {}
self._flutter_driver_dict = {}

def _get_driver(self, process_name):
"""获取AndroidDriver实例"""
Expand All @@ -60,6 +64,104 @@ def get_driver(self, window_title):
"""获取AndroidDriver实例"""
process_name = self._get_window_process(window_title)
return self._get_driver(process_name)

def _get_flutter_driver(self, window_title=None):
"""获取FlutterDriver实例"""

# 为了通过adb读取应用的Flutter Observatory URL地址, 如果该应用不是由adb的, 则需要由adb重新打开
if not window_title:
window = self._window_manager.get_current_window()
window_title = window.title
window_process = self._get_window_process(window.title)
else:
window_process = self._get_window_process(window_title)

if window_process in self._flutter_driver_dict:
return self._flutter_driver_dict[window_process]

try:
observatory_url = self._process_log_to_get_debugger_url(duration=0)
except:
observatory_url = self.adb_reopen_window(window_title)

Log.i('Flutter observatory listen on: {}'.format(observatory_url))
self._flutter_driver_dict[window_process] = FlutterDriver(observatory_url)
return self._flutter_driver_dict[window_process]

def flutter_driver_created(self, window_title):
window_process = self._get_window_process(window_title)
return window_process in self._flutter_driver_dict

def get_flutter_driver(self, window_title):
return self._get_flutter_driver(window_title)

def try_get_flutter_driver(self, window_title):
window_process = self._get_window_process(window_title)
if window_process in self._flutter_driver_dict:
print('direct return')
return self._flutter_driver_dict[window_process]

try:
observatory_url = self._process_log_to_get_debugger_url(duration=0)
Log.i('Flutter observatory listen on: {}'.format(observatory_url))
self._flutter_driver_dict[window_process] = FlutterDriver(observatory_url)
return self._flutter_driver_dict[window_process]
except Exception as e:
print(e)
return None

def adb_reopen_window(self, window_title):
"""使用ADB重新打开应用, 用来读取logcat"""
window_process = self._get_window_process(window_title)
if window_process not in self._flutter_driver_dict:
self._device.kill_process(window_process)
# self.grant_all_runtime_permissions()
# self.enable_media_permission()
try:
launcher_activity = "%s/%s" % (window_process, window_title)
ret = self._device.adb.start_activity(launcher_activity, extra={})
if ret['Status'].find('ok') < 0 or ret['LaunchState'].find('UNKNOWN') >= 0:
raise RuntimeError('重新打开应用失败。')
except:
# 尝试找到该activity的LAUNCHER activity, 没找到的话再从当前activity启动
launcher_activity = self._device.adb.get_package_launcher_activity(window_process)
ret = self._device.adb.start_activity(launcher_activity, extra={})
if ret['Status'].find('ok') < 0:
raise RuntimeError('重新打开应用失败。')

time.sleep(2) # 等一段时间是因为flutter_driver可能尚未加载extensionRPCs
return self._process_log_to_get_debugger_url(duration=20)

def _process_log_to_get_debugger_url(self, duration=10):
"""
解析logcat内容, 获取flutter debugger url信息
"""
REG_EXP = "The Dart VM service is listening on http://(127.0.0.1:(\d+)/.*?/)"

while duration >= 0:
duration -= 1
time.sleep(1)

log_list = self._device.adb.get_log(clear=False)
log_list = [i.decode("utf-8") for i in log_list]
debugger_url_info = None

for item in log_list:
res = re.search(REG_EXP, item)
if res:
debugger_url_info = res

if not debugger_url_info:
continue

# 将http格式的url转换成websocket格式的url, 并创建隧道
port = debugger_url_info.group(2)
# adb.create_tunnel 创建隧道
self._device.adb.forward(int(port), int(port))
ws_url = "ws://" + debugger_url_info.group(1) + "ws"
Log.i('ws_url: ', ws_url)
return ws_url
raise RuntimeError('获取Flutter Observatory listen url failed.')

def update(self):
""" """
Expand Down Expand Up @@ -341,6 +443,62 @@ def get_webview(self, process_name, hashcode):
# process_name = self._get_window_process(window_title)
driver = self._get_driver(process_name)
return WebView(driver, hashcode)

def get_flutter_control(self, window_title, parent, qpath, get_err_pos=False):
"""查找控件"""
# if isinstance(qpath, str): qpath = qpath.encode('utf8')
qpath = QPath(qpath)
driver = self._get_flutter_driver(window_title)
try:
return driver.get_widget(FlutterFinder().by_qpath(json.dumps(qpath._parsed_qpath)))
except Exception as e:
repeat_list = []
for line in e.args[0].split("\n")[1:]:
if not line:
continue
pos = line.find("[")
pos2 = line.find("]", pos)
repeat_list.append(line[pos + 1 : pos2])
return [int(item, 16) for item in repeat_list]
else:
raise e

def _get_flutter_control_tree(self, window_title):
"""获取指定进程中的所有控件树"""
driver = self._get_flutter_driver(window_title)
Log.i("ControlManager", "get flutter control tree in process")
result = driver.get_widget_tree()
Log.i("ControlManager", "get flutter control tree complete")
return result

def get_flutter_control_tree(self):
"""获取当前需要获取的所有控件树列表"""
process_list = [] # 已经抓取过控件树的进程列表
self.update()
current_window = self._window_manager.get_current_window()
if current_window == None:
# 获取当前Activity
current_activity = self._device._send_command("GetCurrentActivity")
self._window_manager.update()
for window in self._window_manager.get_window_list():
if window.title == current_activity:
current_window = window
break
else:
raise RuntimeError("find window %s failed" % current_activity)
package_name = current_window.package_name # 只获取该包名对应的窗口
if package_name == None:
raise RuntimeError("get %s package name failed" % current_window)

result = {}
current_process = self._get_window_process(current_window)

if current_process == None:
Log.w("ControlManager", "get process of %s failed" % current_window)
current_process = package_name
result = self._get_flutter_control_tree(current_window.title) # 先获取当前窗口所在进程中的所有控件树
process_list.append(current_process)
return result


class WebView(object):
Expand Down Expand Up @@ -382,5 +540,11 @@ def eval_script(self, frame_xpaths, script):
return self._driver.eval_script(self._hashcode, frame_xpaths, script)


class FlutterView(object):
def __init__(self, driver, hash_code):
self._driver = driver # Flutter Driver
self._hash_code = hash_code


if __name__ == "__main__":
pass
15 changes: 15 additions & 0 deletions manager/windowmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ def _get_windows_data(self):
p3 = re.compile(
r"mShownFrame=\[([-\d\.]+),([-\d\.]+)\]\[([-\d\.]+),([-\d\.]+)\]"
)
record = {}

for line in result.split("\n")[1:]:
# print repr(line)
ret = p1.match(line)
Expand All @@ -221,10 +223,12 @@ def _get_windows_data(self):
title = items[1]
else:
title = items[0]

window = Window(
self, int(ret.group(1)), ret.group(2), title
) # 此逻辑可能有bug
windows.append(window)
record[title] = {'id': int(ret.group(1)), 'hashcode': ret.group(2)}
elif "mHoldScreenWindow" in line:
ret = p2.search(line)
if ret:
Expand Down Expand Up @@ -284,6 +288,17 @@ def _get_windows_data(self):
window[key] = val
else:
pass

if not self._current_window:
result = self._device.adb.run_shell_cmd("dumpsys activity activities | grep mResumedActivity")
result = result.replace("\r", "")
p = re.compile(r"mResumedActivity: ActivityRecord{(\S+) \S+ (\S+) .*?}")
ret = p.search(result)

if ret is not None:
parts = ret.group(2).split('/')
title = parts[0]+'/'+parts[0]+parts[1]
self._current_window = Window(self, record[title]['id'], record[title]['hashcode'], title)

return windows

Expand Down
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
wxPython==4.1.1
PyInstaller==4.3
qt4a==2.3.6
PyInstaller
qt4a==2.3.9
pywin32==228; sys_platform == 'win32'
websocket-client==1.3.1
Loading