]>
git.proxmox.com Git - mirror_qemu.git/blob - python/qemu/aqmp/legacy.py
4 This class pretends to be qemu.qmp.QEMUMonitorProtocol.
17 from qemu
.qmp
import QMPMessage
, QMPReturnValue
, SocketAddrT
19 from .qmp_client
import QMPClient
22 # pylint: disable=missing-docstring
25 class QEMUMonitorProtocol(qemu
.qmp
.QEMUMonitorProtocol
):
26 def __init__(self
, address
: SocketAddrT
,
28 nickname
: Optional
[str] = None):
30 # pylint: disable=super-init-not-called
31 self
._aqmp
= QMPClient(nickname
)
32 self
._aloop
= asyncio
.get_event_loop()
33 self
._address
= address
34 self
._timeout
: Optional
[float] = None
39 self
, future
: Awaitable
[_T
], timeout
: Optional
[float] = None
41 return self
._aloop
.run_until_complete(
42 asyncio
.wait_for(future
, timeout
=timeout
)
45 def _get_greeting(self
) -> Optional
[QMPMessage
]:
46 if self
._aqmp
.greeting
is not None:
47 # pylint: disable=protected-access
48 return self
._aqmp
.greeting
._asdict
()
51 # __enter__ and __exit__ need no changes
52 # parse_address needs no changes
54 def connect(self
, negotiate
: bool = True) -> Optional
[QMPMessage
]:
55 self
._aqmp
.await_greeting
= negotiate
56 self
._aqmp
.negotiate
= negotiate
59 self
._aqmp
.connect(self
._address
)
61 return self
._get
_greeting
()
63 def accept(self
, timeout
: Optional
[float] = 15.0) -> QMPMessage
:
64 self
._aqmp
.await_greeting
= True
65 self
._aqmp
.negotiate
= True
68 self
._aqmp
.accept(self
._address
),
72 ret
= self
._get
_greeting
()
73 assert ret
is not None
76 def cmd_obj(self
, qmp_cmd
: QMPMessage
) -> QMPMessage
:
79 # pylint: disable=protected-access
81 # _raw() isn't a public API, because turning off
82 # automatic ID assignment is discouraged. For
83 # compatibility with iotests *only*, do it anyway.
84 self
._aqmp
._raw
(qmp_cmd
, assign_id
=False),
89 # Default impl of cmd() delegates to cmd_obj
91 def command(self
, cmd
: str, **kwds
: object) -> QMPReturnValue
:
93 self
._aqmp
.execute(cmd
, kwds
),
98 wait
: Union
[bool, float] = False) -> Optional
[QMPMessage
]:
100 # wait is False/0: "do not wait, do not except."
101 if self
._aqmp
.events
.empty():
104 # If wait is 'True', wait forever. If wait is False/0, the events
105 # queue must not be empty; but it still needs some real amount
106 # of time to complete.
108 if wait
and isinstance(wait
, float):
113 self
._aqmp
.events
.get(),
118 def get_events(self
, wait
: Union
[bool, float] = False) -> List
[QMPMessage
]:
119 events
= [dict(x
) for x
in self
._aqmp
.events
.clear()]
123 event
= self
.pull_event(wait
)
124 return [event
] if event
is not None else []
126 def clear_events(self
) -> None:
127 self
._aqmp
.events
.clear()
129 def close(self
) -> None:
131 self
._aqmp
.disconnect()
134 def settimeout(self
, timeout
: Optional
[float]) -> None:
135 self
._timeout
= timeout
137 def send_fd_scm(self
, fd
: int) -> None:
138 self
._aqmp
.send_fd_scm(fd
)