]> git.proxmox.com Git - mirror_qemu.git/blob - python/qemu/aqmp/legacy.py
hw/display/artist: Fix draw_line() artefacts
[mirror_qemu.git] / python / qemu / aqmp / legacy.py
1 """
2 Sync QMP Wrapper
3
4 This class pretends to be qemu.qmp.QEMUMonitorProtocol.
5 """
6
7 import asyncio
8 from typing import (
9 Awaitable,
10 List,
11 Optional,
12 TypeVar,
13 Union,
14 )
15
16 import qemu.qmp
17 from qemu.qmp import QMPMessage, QMPReturnValue, SocketAddrT
18
19 from .qmp_client import QMPClient
20
21
22 # pylint: disable=missing-docstring
23
24
25 class QEMUMonitorProtocol(qemu.qmp.QEMUMonitorProtocol):
26 def __init__(self, address: SocketAddrT,
27 server: bool = False,
28 nickname: Optional[str] = None):
29
30 # pylint: disable=super-init-not-called
31 self._aqmp = QMPClient(nickname)
32 self._aloop = asyncio.get_event_loop()
33 self._address = address
34 self._timeout: Optional[float] = None
35
36 _T = TypeVar('_T')
37
38 def _sync(
39 self, future: Awaitable[_T], timeout: Optional[float] = None
40 ) -> _T:
41 return self._aloop.run_until_complete(
42 asyncio.wait_for(future, timeout=timeout)
43 )
44
45 def _get_greeting(self) -> Optional[QMPMessage]:
46 if self._aqmp.greeting is not None:
47 # pylint: disable=protected-access
48 return self._aqmp.greeting._asdict()
49 return None
50
51 # __enter__ and __exit__ need no changes
52 # parse_address needs no changes
53
54 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
55 self._aqmp.await_greeting = negotiate
56 self._aqmp.negotiate = negotiate
57
58 self._sync(
59 self._aqmp.connect(self._address)
60 )
61 return self._get_greeting()
62
63 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
64 self._aqmp.await_greeting = True
65 self._aqmp.negotiate = True
66
67 self._sync(
68 self._aqmp.accept(self._address),
69 timeout
70 )
71
72 ret = self._get_greeting()
73 assert ret is not None
74 return ret
75
76 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
77 return dict(
78 self._sync(
79 # pylint: disable=protected-access
80
81 # _raw() isn't a public API, because turning off
82 # automatic ID assignment is discouraged. For
83 # compatibility with iotests *only*, do it anyway.
84 self._aqmp._raw(qmp_cmd, assign_id=False),
85 self._timeout
86 )
87 )
88
89 # Default impl of cmd() delegates to cmd_obj
90
91 def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
92 return self._sync(
93 self._aqmp.execute(cmd, kwds),
94 self._timeout
95 )
96
97 def pull_event(self,
98 wait: Union[bool, float] = False) -> Optional[QMPMessage]:
99 if not wait:
100 # wait is False/0: "do not wait, do not except."
101 if self._aqmp.events.empty():
102 return None
103
104 # If wait is 'True', wait forever. If wait is False/0, the events
105 # queue must not be empty; but it still needs some real amount
106 # of time to complete.
107 timeout = None
108 if wait and isinstance(wait, float):
109 timeout = wait
110
111 return dict(
112 self._sync(
113 self._aqmp.events.get(),
114 timeout
115 )
116 )
117
118 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
119 events = [dict(x) for x in self._aqmp.events.clear()]
120 if events:
121 return events
122
123 event = self.pull_event(wait)
124 return [event] if event is not None else []
125
126 def clear_events(self) -> None:
127 self._aqmp.events.clear()
128
129 def close(self) -> None:
130 self._sync(
131 self._aqmp.disconnect()
132 )
133
134 def settimeout(self, timeout: Optional[float]) -> None:
135 self._timeout = timeout
136
137 def send_fd_scm(self, fd: int) -> None:
138 self._aqmp.send_fd_scm(fd)