-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvortex_run.py
More file actions
executable file
·159 lines (143 loc) · 7.14 KB
/
vortex_run.py
File metadata and controls
executable file
·159 lines (143 loc) · 7.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# vortex - GCode machine emulator
# Copyright (C) 2024-2025 Mitko Haralanov
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys
import argparse
import logging
import errno
import traceback
import threading
import vortex.emulator
import vortex.emulator.config
import vortex.core.lib.logging as logging
def create_arg_parser():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
frontend = parser.add_argument_group("Frontend Options")
frontend.add_argument("-f", "--frontend", default="direct",
help="""The frontend that will be started for
the emulation.""")
frontend.add_argument("-s", action="store_true", dest="sequential",
help="""Enable sequential mode. In this
mode, the frontent will execute one command
at a time rather than submit commands to the
command queue as they are received.""")
controller = parser.add_argument_group("Controller Options")
controller.add_argument("-c", "--controller", default=None,
help="""The HW controller to be used for the
emulation. This argument is required.""")
controller.add_argument("-F", "--frequency", default="1MHz",
help="""Frequency of control loop. This
control loop is the main emulator control loop. It's
the one that updates controller clock and emulation
runtime. Higher values provide more precise
emulation, at the cost of CPU load.""")
controller.add_argument("-T", "--process-frequency", default="100KHz",
help="""This is the frequency with which the
core's event processing threads updates will run.
The event processing thread are responsible for
processing command submission and completion,
event processing, etc.""")
controller.add_argument("-P", "--set-priority", action="store_true",
help="""Set the priority of the emulator to
real-time. This will make the emulator run
with higher priority than other processes
on the system. This is useful for more
precise emulation but may affect system
performance as the emulator will take up more
CPU cycles.""")
debug = parser.add_argument_group("Debug Options")
debug_levels = sorted(logging.LOG_LEVELS.values())
debug.add_argument("-d", "--debug", choices=debug_levels, metavar="LEVEL",
default="INFO",
help="""Set logging level. Higher logging levels will
provide more information but will also affect
conroller timing more.""")
debug.add_argument("--filter", type=str, action="append", default=[],
help="""Filter log messages by the specified
module/object. Filter format is a dot-separated
hierarchy of modules/objects. For example, the filter
'vortex.core.stepper.X' will only show log messages from
the core HW stepper object with name 'X'. '*' can
be used to match all modules/objects at the particular
level. This option can be used multiple times to
filter multiple modules. The filter is applied to
the module name and not the logger name.""")
debug.add_argument("-l", "--logfile", type=str, default=None,
help="""Log messages are sent to the file specified
by this option.""")
debug.add_argument("--extended-logging", action="store_true",
help="""Enable extended debugging. When enabled, log
messages will also contain the source of the message
(filename and line number). """)
debug.add_argument("-R", "--remote", action="store_true",
help="""Start remote API server thread. This thread
processes requests from the monitoring application.""")
debug.add_argument("--enable-profiling", action="store_true")
parser.add_argument("-C", "--config", required=True,
help="""HW object configuration file. This argument
is required.""")
return parser
def emulator_exception_handler(exc_class, exc, tb):
_tb = tb
while _tb.tb_next is not None:
_tb = _tb.tb_next
frame = _tb.tb_frame
co = frame.f_code
logging.critical(f"Exception occured at {co.co_name}:{frame.f_lineno} [{co.co_filename}]:")
logging.critical(f" Exception: {str(exc)}")
if logging.get_level() <= logging.DEBUG:
lines = traceback.format_tb(tb)
for entry in lines:
for line in entry.split("\n"):
logging.critical(" " + line.rstrip())
def emulator_thread_exception_handler(args):
emulator_exception_handler(args.exc_type, args.exc_value, args.exc_traceback)
def main():
parser = create_arg_parser()
opts = parser.parse_args()
logging.init(opts.debug, opts.logfile, opts.extended_logging)
logging.add_filter(opts.filter)
config = vortex.emulator.config.Configuration()
try:
config.read(opts.config)
except Exception as err:
logging.critical(str(err))
return errno.EINVAL
if opts.controller:
config.override_controller(opts.controller)
# Setup the exception hook so uncaught exceptions
# are handled more gracefully.
sys.excepthook = emulator_exception_handler
threading.excepthook = emulator_thread_exception_handler
try:
emulation = vortex.emulator.Emulator(config, opts.frontend, opts.sequential)
except vortex.emulator.EmulatorError as err:
logging.critical(str(err))
return errno.ENOENT
if opts.enable_profiling:
emulation.enable_profiler()
emulation.set_frequency(opts.frequency, opts.process_frequency)
emulation.set_thread_priority_change(opts.set_priority)
if opts.remote:
emulation.start_remote_server()
try:
emulation.start()
except KeyboardInterrupt:
pass
finally:
emulation.stop()
return 0
sys.exit(main())