-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathclient.py
More file actions
141 lines (105 loc) · 4.71 KB
/
client.py
File metadata and controls
141 lines (105 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import time
from queue import Empty, Queue
from typing import TypeVar
import websocket
from dataclasses_jsonschema import ValidationError
from arcor2 import json
from arcor2.data import events, rpc
from arcor2.data.rpc import get_id
from arcor2.exceptions import Arcor2Exception
from arcor2.logging import get_logger
from arcor2_arserver_data import rpc as srpc
class ARServerClientException(Arcor2Exception):
pass
RR = TypeVar("RR", bound=rpc.common.RPC.Response)
class ARServer:
"""Really simple client for ARServer.
Instead of having a separate method for each RPC, it has one method
(call_rpc) which takes instance of Request and returns instance of
Response.
"""
def __init__(
self,
ws_connection_str: str = "ws://0.0.0.0:6789",
timeout: float = 3.0,
event_mapping: None | dict[str, type[events.Event]] = None,
):
self._ws = websocket.WebSocket()
self._logger = get_logger(__name__)
self._event_queue: Queue[events.Event] = Queue()
if event_mapping is None:
event_mapping = {}
self.event_mapping = event_mapping
start_time = time.monotonic()
while time.monotonic() < start_time + timeout:
try:
self._ws.connect(ws_connection_str)
break
except ConnectionRefusedError:
time.sleep(0.25)
if not self._ws.connected:
raise ARServerClientException(f"Failed to connect to '{ws_connection_str}'.")
self._ws.settimeout(timeout)
system_info = self._call_rpc(srpc.c.SystemInfo.Request(get_id()), srpc.c.SystemInfo.Response).data
if system_info is None:
raise ARServerClientException("Failed to get SystemInfo.")
self._logger.info(f"Connected to server version {system_info.version}.")
self._supported_rpcs = system_info.supported_rpc_requests
def call_rpc(self, req: rpc.common.RPC.Request, resp_type: type[RR]) -> RR:
if req.request not in self._supported_rpcs:
raise ARServerClientException(f"{req.request} RPC not supported by the server.")
return self._call_rpc(req, resp_type)
def _call_rpc(self, req: rpc.common.RPC.Request, resp_type: type[RR]) -> RR:
self._ws.send(req.to_json())
# wait for RPC response, put any incoming event into the queue
while True:
try:
data = self._ws.recv()
assert isinstance(data, str), "Binary payload not supported"
recv_dict = json.loads(data)
except websocket.WebSocketTimeoutException:
raise ARServerClientException("RPC timeouted.")
if not isinstance(recv_dict, dict):
self._logger.debug(f"Invalid data received: {recv_dict}")
continue
if "response" in recv_dict:
break
elif "event" in recv_dict:
self._event_queue.put(self.event_mapping[recv_dict["event"]].from_dict(recv_dict))
try:
resp = resp_type.from_dict(recv_dict)
except ValidationError as e:
self._logger.error(f"Request: {req.to_dict()}, response: {recv_dict}.")
raise ARServerClientException("RPC response validation failed.") from e
assert req.id == resp.id
assert req.request == resp.response
return resp
def get_event(self, drop_everything_until: None | type[events.Event] = None) -> events.Event:
"""Returns queued events (if any) or wait until some event arrives.
:param drop_everything_until: Drop any event until there is one of required type.
:return:
"""
while True:
try:
evt = self._event_queue.get_nowait()
except Empty:
try:
data = self._ws.recv()
assert isinstance(data, str), "Binary payload not supported."
recv_dict = json.loads(data)
except websocket.WebSocketTimeoutException:
raise ARServerClientException("Timeouted.")
if not isinstance(recv_dict, dict):
raise ARServerClientException(f"Invalid data received: {recv_dict}")
if "event" not in recv_dict:
raise ARServerClientException(f"Expected event, got: {recv_dict}")
evt = self.event_mapping[recv_dict["event"]].from_dict(recv_dict)
if drop_everything_until and not isinstance(evt, drop_everything_until):
continue
return evt
def close(self) -> None:
self._ws.close()
def __enter__(self) -> "ARServer":
return self
def __exit__(self, *args) -> None:
self.close()