forked from smart-needle-manual/system_integration
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
73 lines (60 loc) · 2.26 KB
/
example.py
File metadata and controls
73 lines (60 loc) · 2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import slicer
import time
# Get ROS2 logic and default node
rosLogic = slicer.util.getModuleLogic('ROS2')
rosNode = rosLogic.GetDefaultROS2Node()
topic_name = '/needle/state/current_shape'
msg_type = 'PoseArray'
# Create subscriber node
subShape = rosNode.CreateAndAddSubscriberNode(msg_type, topic_name)
if subShape is None:
raise RuntimeError(f"Failed to create subscriber for {topic_name}")
print(f"✅ Subscribed to {topic_name}")
# Create curve node if needed
curveNode = slicer.mrmlScene.GetFirstNodeByName("NeedleCurve")
if curveNode is None:
curveNode = slicer.mrmlScene.AddNewNodeByClass("vtkMRMLMarkupsCurveNode", "NeedleCurve")
curveNode.SetCurveTypeToLinear()
print("✅ Created vtkMRMLMarkupsCurveNode 'NeedleCurve'")
# Parse YAML manually
def parse_posearray_yaml(yaml_str):
poses = []
lines = yaml_str.splitlines()
current_pose = None
for line in lines:
line = line.strip()
if line.startswith("position:"):
current_pose = {}
elif current_pose is not None:
if line.startswith("x:"):
current_pose['x'] = float(line.split(":")[1])
elif line.startswith("y:"):
current_pose['y'] = float(line.split(":")[1])
elif line.startswith("z:"):
current_pose['z'] = float(line.split(":")[1])
poses.append(current_pose)
current_pose = None
return poses
# Throttle setup: 1 update per minute
last_update_time = 0
update_interval = 60.0 # seconds → 1 update per minute
# Callback
def onShapeUpdate(caller=None, event=None):
global last_update_time
t = time.time()
if t - last_update_time < update_interval:
return # skip update if too soon
last_update_time = t
yaml_str = subShape.GetLastMessageYAML()
if not yaml_str:
return
poses = parse_posearray_yaml(yaml_str)
if not poses:
return
curveNode.RemoveAllControlPoints()
for p in poses:
curveNode.AddControlPoint(p['x'], p['y'], p['z'])
print(f"✅ Curve updated with {len(poses)} points at {time.strftime('%H:%M:%S')}")
# Add observer
observerId = subShape.AddObserver('ModifiedEvent', onShapeUpdate)
print("\n🚀 Listening for /needle/state/current_shape (1 update per minute) ...")