]> git.proxmox.com Git - mirror_qemu.git/blame - python/qemu/aqmp/legacy.py
python: upgrade mypy to 0.780
[mirror_qemu.git] / python / qemu / aqmp / legacy.py
CommitLineData
f122be60
JS
1"""
2Sync QMP Wrapper
3
4This class pretends to be qemu.qmp.QEMUMonitorProtocol.
5"""
6
7import asyncio
8from typing import (
0e6bfd8b 9 Any,
f122be60 10 Awaitable,
0e6bfd8b 11 Dict,
f122be60
JS
12 List,
13 Optional,
14 TypeVar,
15 Union,
16)
17
18import qemu.qmp
f122be60 19
6e7751dc 20from .error import QMPError
0e6bfd8b 21from .protocol import Runstate, SocketAddrT
f122be60
JS
22from .qmp_client import QMPClient
23
24
f3efd129
JS
25# (Temporarily) Re-export QMPBadPortError
26QMPBadPortError = qemu.qmp.QMPBadPortError
27
0e6bfd8b
JS
28#: QMPMessage is an entire QMP message of any kind.
29QMPMessage = Dict[str, Any]
30
31#: QMPReturnValue is the 'return' value of a command.
32QMPReturnValue = object
33
34#: QMPObject is any object in a QMP message.
35QMPObject = Dict[str, object]
36
37# QMPMessage can be outgoing commands or incoming events/returns.
38# QMPReturnValue is usually a dict/json object, but due to QAPI's
39# 'returns-whitelist', it can actually be anything.
40#
41# {'return': {}} is a QMPMessage,
42# {} is the QMPReturnValue.
43
44
f122be60
JS
45# pylint: disable=missing-docstring
46
47
48class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
49 def __init__(self, address: SocketAddrT,
50 server: bool = False,
51 nickname: Optional[str] = None):
52
53 # pylint: disable=super-init-not-called
54 self._aqmp = QMPClient(nickname)
55 self._aloop = asyncio.get_event_loop()
56 self._address = address
57 self._timeout: Optional[float] = None
58
59 _T = TypeVar('_T')
60
61 def _sync(
62 self, future: Awaitable[_T], timeout: Optional[float] = None
63 ) -> _T:
64 return self._aloop.run_until_complete(
65 asyncio.wait_for(future, timeout=timeout)
66 )
67
68 def _get_greeting(self) -> Optional[QMPMessage]:
69 if self._aqmp.greeting is not None:
70 # pylint: disable=protected-access
71 return self._aqmp.greeting._asdict()
72 return None
73
74 # __enter__ and __exit__ need no changes
75 # parse_address needs no changes
76
77 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
78 self._aqmp.await_greeting = negotiate
79 self._aqmp.negotiate = negotiate
80
81 self._sync(
82 self._aqmp.connect(self._address)
83 )
84 return self._get_greeting()
85
86 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
87 self._aqmp.await_greeting = True
88 self._aqmp.negotiate = True
89
90 self._sync(
91 self._aqmp.accept(self._address),
92 timeout
93 )
94
95 ret = self._get_greeting()
96 assert ret is not None
97 return ret
98
99 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
100 return dict(
101 self._sync(
102 # pylint: disable=protected-access
103
104 # _raw() isn't a public API, because turning off
105 # automatic ID assignment is discouraged. For
106 # compatibility with iotests *only*, do it anyway.
107 self._aqmp._raw(qmp_cmd, assign_id=False),
108 self._timeout
109 )
110 )
111
112 # Default impl of cmd() delegates to cmd_obj
113
114 def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
115 return self._sync(
116 self._aqmp.execute(cmd, kwds),
117 self._timeout
118 )
119
120 def pull_event(self,
121 wait: Union[bool, float] = False) -> Optional[QMPMessage]:
122 if not wait:
123 # wait is False/0: "do not wait, do not except."
124 if self._aqmp.events.empty():
125 return None
126
127 # If wait is 'True', wait forever. If wait is False/0, the events
128 # queue must not be empty; but it still needs some real amount
129 # of time to complete.
130 timeout = None
131 if wait and isinstance(wait, float):
132 timeout = wait
133
134 return dict(
135 self._sync(
136 self._aqmp.events.get(),
137 timeout
138 )
139 )
140
141 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
142 events = [dict(x) for x in self._aqmp.events.clear()]
143 if events:
144 return events
145
146 event = self.pull_event(wait)
147 return [event] if event is not None else []
148
149 def clear_events(self) -> None:
150 self._aqmp.events.clear()
151
152 def close(self) -> None:
153 self._sync(
154 self._aqmp.disconnect()
155 )
156
157 def settimeout(self, timeout: Optional[float]) -> None:
158 self._timeout = timeout
159
160 def send_fd_scm(self, fd: int) -> None:
161 self._aqmp.send_fd_scm(fd)
3bc72e3a
JS
162
163 def __del__(self) -> None:
164 if self._aqmp.runstate == Runstate.IDLE:
165 return
166
167 if not self._aloop.is_running():
168 self.close()
169 else:
170 # Garbage collection ran while the event loop was running.
171 # Nothing we can do about it now, but if we don't raise our
172 # own error, the user will be treated to a lot of traceback
173 # they might not understand.
6e7751dc 174 raise QMPError(
3bc72e3a
JS
175 "QEMUMonitorProtocol.close()"
176 " was not called before object was garbage collected"
177 )