]> git.proxmox.com Git - mirror_qemu.git/blame - python/qemu/aqmp/qmp_client.py
Merge remote-tracking branch 'remotes/thuth/tags/pull-request-2021-10-15' into staging
[mirror_qemu.git] / python / qemu / aqmp / qmp_client.py
CommitLineData
c67d696b
JS
1"""
2QMP Protocol Implementation
3
4This module provides the `QMPClient` class, which can be used to connect
5and send commands to a QMP server such as QEMU. The QMP class can be
6used to either connect to a listening server, or used to listen and
7accept an incoming connection from that server.
8"""
9
e0fea0b3 10import asyncio
c67d696b 11import logging
6e2f6ec5
JS
12import socket
13import struct
c67d696b
JS
14from typing import (
15 Dict,
16 List,
17 Mapping,
18 Optional,
577737be
JS
19 Union,
20 cast,
c67d696b
JS
21)
22
577737be 23from .error import AQMPError, ProtocolError
c67d696b
JS
24from .events import Events
25from .message import Message
e0fea0b3
JS
26from .models import ErrorResponse, Greeting
27from .protocol import AsyncProtocol, Runstate, require
c67d696b
JS
28from .util import (
29 bottom_half,
30 exception_summary,
31 pretty_traceback,
32 upper_half,
33)
34
35
36class _WrappedProtocolError(ProtocolError):
37 """
38 Abstract exception class for Protocol errors that wrap an Exception.
39
40 :param error_message: Human-readable string describing the error.
41 :param exc: The root-cause exception.
42 """
43 def __init__(self, error_message: str, exc: Exception):
44 super().__init__(error_message)
45 self.exc = exc
46
47 def __str__(self) -> str:
48 return f"{self.error_message}: {self.exc!s}"
49
50
51class GreetingError(_WrappedProtocolError):
52 """
53 An exception occurred during the Greeting phase.
54
55 :param error_message: Human-readable string describing the error.
56 :param exc: The root-cause exception.
57 """
58
59
60class NegotiationError(_WrappedProtocolError):
61 """
62 An exception occurred during the Negotiation phase.
63
64 :param error_message: Human-readable string describing the error.
65 :param exc: The root-cause exception.
66 """
67
68
e0fea0b3
JS
69class ExecuteError(AQMPError):
70 """
71 Exception raised by `QMPClient.execute()` on RPC failure.
72
73 :param error_response: The RPC error response object.
74 :param sent: The sent RPC message that caused the failure.
75 :param received: The raw RPC error reply received.
76 """
77 def __init__(self, error_response: ErrorResponse,
78 sent: Message, received: Message):
79 super().__init__(error_response.error.desc)
80 #: The sent `Message` that caused the failure
81 self.sent: Message = sent
82 #: The received `Message` that indicated failure
83 self.received: Message = received
84 #: The parsed error response
85 self.error: ErrorResponse = error_response
86 #: The QMP error class
87 self.error_class: str = error_response.error.class_
88
89
577737be
JS
90class ExecInterruptedError(AQMPError):
91 """
e0fea0b3 92 Exception raised by `execute()` (et al) when an RPC is interrupted.
577737be 93
e0fea0b3 94 This error is raised when an `execute()` statement could not be
577737be
JS
95 completed. This can occur because the connection itself was
96 terminated before a reply was received.
97
98 The true cause of the interruption will be available via `disconnect()`.
99 """
100
101
102class _MsgProtocolError(ProtocolError):
103 """
104 Abstract error class for protocol errors that have a `Message` object.
105
106 This Exception class is used for protocol errors where the `Message`
107 was mechanically understood, but was found to be inappropriate or
108 malformed.
109
110 :param error_message: Human-readable string describing the error.
111 :param msg: The QMP `Message` that caused the error.
112 """
113 def __init__(self, error_message: str, msg: Message):
114 super().__init__(error_message)
115 #: The received `Message` that caused the error.
116 self.msg: Message = msg
117
118 def __str__(self) -> str:
119 return "\n".join([
120 super().__str__(),
121 f" Message was: {str(self.msg)}\n",
122 ])
123
124
125class ServerParseError(_MsgProtocolError):
126 """
127 The Server sent a `Message` indicating parsing failure.
128
129 i.e. A reply has arrived from the server, but it is missing the "ID"
130 field, indicating a parsing error.
131
132 :param error_message: Human-readable string describing the error.
133 :param msg: The QMP `Message` that caused the error.
134 """
135
136
e0fea0b3
JS
137class BadReplyError(_MsgProtocolError):
138 """
139 An execution reply was successfully routed, but not understood.
140
141 If a QMP message is received with an 'id' field to allow it to be
142 routed, but is otherwise malformed, this exception will be raised.
143
144 A reply message is malformed if it is missing either the 'return' or
145 'error' keys, or if the 'error' value has missing keys or members of
146 the wrong type.
147
148 :param error_message: Human-readable string describing the error.
149 :param msg: The malformed reply that was received.
150 :param sent: The message that was sent that prompted the error.
151 """
152 def __init__(self, error_message: str, msg: Message, sent: Message):
153 super().__init__(error_message, msg)
154 #: The sent `Message` that caused the failure
155 self.sent = sent
156
157
c67d696b
JS
158class QMPClient(AsyncProtocol[Message], Events):
159 """
160 Implements a QMP client connection.
161
162 QMP can be used to establish a connection as either the transport
163 client or server, though this class always acts as the QMP client.
164
165 :param name: Optional nickname for the connection, used for logging.
166
167 Basic script-style usage looks like this::
168
169 qmp = QMPClient('my_virtual_machine_name')
170 await qmp.connect(('127.0.0.1', 1234))
171 ...
172 res = await qmp.execute('block-query')
173 ...
174 await qmp.disconnect()
175
176 Basic async client-style usage looks like this::
177
178 class Client:
179 def __init__(self, name: str):
180 self.qmp = QMPClient(name)
181
182 async def watch_events(self):
183 try:
184 async for event in self.qmp.events:
185 print(f"Event: {event['event']}")
186 except asyncio.CancelledError:
187 return
188
189 async def run(self, address='/tmp/qemu.socket'):
190 await self.qmp.connect(address)
191 asyncio.create_task(self.watch_events())
192 await self.qmp.runstate_changed.wait()
193 await self.disconnect()
194
195 See `aqmp.events` for more detail on event handling patterns.
196 """
197 #: Logger object used for debugging messages.
198 logger = logging.getLogger(__name__)
199
200 # Read buffer limit; large enough to accept query-qmp-schema
201 _limit = (256 * 1024)
202
577737be
JS
203 # Type alias for pending execute() result items
204 _PendingT = Union[Message, ExecInterruptedError]
205
c67d696b
JS
206 def __init__(self, name: Optional[str] = None) -> None:
207 super().__init__(name)
208 Events.__init__(self)
209
210 #: Whether or not to await a greeting after establishing a connection.
211 self.await_greeting: bool = True
212
213 #: Whether or not to perform capabilities negotiation upon connection.
214 #: Implies `await_greeting`.
215 self.negotiate: bool = True
216
217 # Cached Greeting, if one was awaited.
218 self._greeting: Optional[Greeting] = None
219
e0fea0b3
JS
220 # Command ID counter
221 self._execute_id = 0
222
577737be
JS
223 # Incoming RPC reply messages.
224 self._pending: Dict[
225 Union[str, None],
226 'asyncio.Queue[QMPClient._PendingT]'
227 ] = {}
228
0257209a
JS
229 @property
230 def greeting(self) -> Optional[Greeting]:
231 """The `Greeting` from the QMP server, if any."""
232 return self._greeting
233
c67d696b
JS
234 @upper_half
235 async def _establish_session(self) -> None:
236 """
237 Initiate the QMP session.
238
239 Wait for the QMP greeting and perform capabilities negotiation.
240
241 :raise GreetingError: When the greeting is not understood.
242 :raise NegotiationError: If the negotiation fails.
243 :raise EOFError: When the server unexpectedly hangs up.
244 :raise OSError: For underlying stream errors.
245 """
577737be
JS
246 self._greeting = None
247 self._pending = {}
248
c67d696b
JS
249 if self.await_greeting or self.negotiate:
250 self._greeting = await self._get_greeting()
251
252 if self.negotiate:
253 await self._negotiate()
254
255 # This will start the reader/writers:
256 await super()._establish_session()
257
258 @upper_half
259 async def _get_greeting(self) -> Greeting:
260 """
261 :raise GreetingError: When the greeting is not understood.
262 :raise EOFError: When the server unexpectedly hangs up.
263 :raise OSError: For underlying stream errors.
264
265 :return: the Greeting object given by the server.
266 """
267 self.logger.debug("Awaiting greeting ...")
268
269 try:
270 msg = await self._recv()
271 return Greeting(msg)
272 except (ProtocolError, KeyError, TypeError) as err:
273 emsg = "Did not understand Greeting"
274 self.logger.error("%s: %s", emsg, exception_summary(err))
275 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
276 raise GreetingError(emsg, err) from err
277 except BaseException as err:
278 # EOFError, OSError, or something unexpected.
279 emsg = "Failed to receive Greeting"
280 self.logger.error("%s: %s", emsg, exception_summary(err))
281 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
282 raise
283
284 @upper_half
285 async def _negotiate(self) -> None:
286 """
287 Perform QMP capabilities negotiation.
288
289 :raise NegotiationError: When negotiation fails.
290 :raise EOFError: When the server unexpectedly hangs up.
291 :raise OSError: For underlying stream errors.
292 """
293 self.logger.debug("Negotiating capabilities ...")
294
295 arguments: Dict[str, List[str]] = {'enable': []}
296 if self._greeting and 'oob' in self._greeting.QMP.capabilities:
297 arguments['enable'].append('oob')
298 msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
299
300 # It's not safe to use execute() here, because the reader/writers
301 # aren't running. AsyncProtocol *requires* that a new session
302 # does not fail after the reader/writers are running!
303 try:
304 await self._send(msg)
305 reply = await self._recv()
306 assert 'return' in reply
307 assert 'error' not in reply
308 except (ProtocolError, AssertionError) as err:
309 emsg = "Negotiation failed"
310 self.logger.error("%s: %s", emsg, exception_summary(err))
311 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
312 raise NegotiationError(emsg, err) from err
313 except BaseException as err:
314 # EOFError, OSError, or something unexpected.
315 emsg = "Negotiation failed"
316 self.logger.error("%s: %s", emsg, exception_summary(err))
317 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
318 raise
319
577737be
JS
320 @bottom_half
321 async def _bh_disconnect(self) -> None:
322 try:
323 await super()._bh_disconnect()
324 finally:
325 if self._pending:
326 self.logger.debug("Cancelling pending executions")
327 keys = self._pending.keys()
328 for key in keys:
329 self.logger.debug("Cancelling execution '%s'", key)
330 self._pending[key].put_nowait(
331 ExecInterruptedError("Disconnected")
332 )
333
334 self.logger.debug("QMP Disconnected.")
335
336 @upper_half
337 def _cleanup(self) -> None:
338 super()._cleanup()
339 assert not self._pending
340
c67d696b
JS
341 @bottom_half
342 async def _on_message(self, msg: Message) -> None:
343 """
344 Add an incoming message to the appropriate queue/handler.
577737be
JS
345
346 :raise ServerParseError: When Message indicates server parse failure.
c67d696b
JS
347 """
348 # Incoming messages are not fully parsed/validated here;
349 # do only light peeking to know how to route the messages.
350
351 if 'event' in msg:
352 await self._event_dispatch(msg)
353 return
354
355 # Below, we assume everything left is an execute/exec-oob response.
577737be
JS
356
357 exec_id = cast(Optional[str], msg.get('id'))
358
359 if exec_id in self._pending:
360 await self._pending[exec_id].put(msg)
361 return
362
363 # We have a message we can't route back to a caller.
364
365 is_error = 'error' in msg
366 has_id = 'id' in msg
367
368 if is_error and not has_id:
369 # This is very likely a server parsing error.
370 # It doesn't inherently belong to any pending execution.
371 # Instead of performing clever recovery, just terminate.
372 # See "NOTE" in qmp-spec.txt, section 2.4.2
373 raise ServerParseError(
374 ("Server sent an error response without an ID, "
375 "but there are no ID-less executions pending. "
376 "Assuming this is a server parser failure."),
377 msg
378 )
379
380 # qmp-spec.txt, section 2.4:
381 # 'Clients should drop all the responses
382 # that have an unknown "id" field.'
383 self.logger.log(
384 logging.ERROR if is_error else logging.WARNING,
385 "Unknown ID '%s', message dropped.",
386 exec_id,
387 )
388 self.logger.debug("Unroutable message: %s", str(msg))
c67d696b
JS
389
390 @upper_half
391 @bottom_half
392 async def _do_recv(self) -> Message:
393 """
394 :raise OSError: When a stream error is encountered.
395 :raise EOFError: When the stream is at EOF.
396 :raise ProtocolError:
397 When the Message is not understood.
398 See also `Message._deserialize`.
399
400 :return: A single QMP `Message`.
401 """
402 msg_bytes = await self._readline()
403 msg = Message(msg_bytes, eager=True)
404 return msg
405
406 @upper_half
407 @bottom_half
408 def _do_send(self, msg: Message) -> None:
409 """
410 :raise ValueError: JSON serialization failure
411 :raise TypeError: JSON serialization failure
412 :raise OSError: When a stream error is encountered.
413 """
414 assert self._writer is not None
415 self._writer.write(bytes(msg))
416
e0fea0b3
JS
417 @upper_half
418 def _get_exec_id(self) -> str:
419 exec_id = f"__aqmp#{self._execute_id:05d}"
420 self._execute_id += 1
421 return exec_id
422
423 @upper_half
424 async def _issue(self, msg: Message) -> Union[None, str]:
425 """
426 Issue a QMP `Message` and do not wait for a reply.
427
428 :param msg: The QMP `Message` to send to the server.
429
430 :return: The ID of the `Message` sent.
431 """
432 msg_id: Optional[str] = None
433 if 'id' in msg:
434 assert isinstance(msg['id'], str)
435 msg_id = msg['id']
436
437 self._pending[msg_id] = asyncio.Queue(maxsize=1)
438 await self._outgoing.put(msg)
439
440 return msg_id
441
442 @upper_half
443 async def _reply(self, msg_id: Union[str, None]) -> Message:
444 """
445 Await a reply to a previously issued QMP message.
446
447 :param msg_id: The ID of the previously issued message.
448
449 :return: The reply from the server.
450 :raise ExecInterruptedError:
451 When the reply could not be retrieved because the connection
452 was lost, or some other problem.
453 """
454 queue = self._pending[msg_id]
455 result = await queue.get()
456
457 try:
458 if isinstance(result, ExecInterruptedError):
459 raise result
460 return result
461 finally:
462 del self._pending[msg_id]
463
464 @upper_half
465 async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
466 """
467 Send a QMP `Message` to the server and await a reply.
468
469 This method *assumes* you are sending some kind of an execute
470 statement that *will* receive a reply.
471
472 An execution ID will be assigned if assign_id is `True`. It can be
473 disabled, but this requires that an ID is manually assigned
474 instead. For manually assigned IDs, you must not use the string
475 '__aqmp#' anywhere in the ID.
476
477 :param msg: The QMP `Message` to execute.
478 :param assign_id: If True, assign a new execution ID.
479
480 :return: Execution reply from the server.
481 :raise ExecInterruptedError:
482 When the reply could not be retrieved because the connection
483 was lost, or some other problem.
484 """
485 if assign_id:
486 msg['id'] = self._get_exec_id()
487 elif 'id' in msg:
488 assert isinstance(msg['id'], str)
489 assert '__aqmp#' not in msg['id']
490
491 exec_id = await self._issue(msg)
492 return await self._reply(exec_id)
493
41f4f922
JS
494 @upper_half
495 @require(Runstate.RUNNING)
496 async def _raw(
497 self,
498 msg: Union[Message, Mapping[str, object], bytes],
499 assign_id: bool = True,
500 ) -> Message:
501 """
502 Issue a raw `Message` to the QMP server and await a reply.
503
504 :param msg:
505 A Message to send to the server. It may be a `Message`, any
506 Mapping (including Dict), or raw bytes.
507 :param assign_id:
508 Assign an arbitrary execution ID to this message. If
509 `False`, the existing id must either be absent (and no other
510 such pending execution may omit an ID) or a string. If it is
511 a string, it must not start with '__aqmp#' and no other such
512 pending execution may currently be using that ID.
513
514 :return: Execution reply from the server.
515
516 :raise ExecInterruptedError:
517 When the reply could not be retrieved because the connection
518 was lost, or some other problem.
519 :raise TypeError:
520 When assign_id is `False`, an ID is given, and it is not a string.
521 :raise ValueError:
522 When assign_id is `False`, but the ID is not usable;
523 Either because it starts with '__aqmp#' or it is already in-use.
524 """
525 # 1. convert generic Mapping or bytes to a QMP Message
526 # 2. copy Message objects so that we assign an ID only to the copy.
527 msg = Message(msg)
528
529 exec_id = msg.get('id')
530 if not assign_id and 'id' in msg:
531 if not isinstance(exec_id, str):
532 raise TypeError(f"ID ('{exec_id}') must be a string.")
533 if exec_id.startswith('__aqmp#'):
534 raise ValueError(
535 f"ID ('{exec_id}') must not start with '__aqmp#'."
536 )
537
538 if not assign_id and exec_id in self._pending:
539 raise ValueError(
540 f"ID '{exec_id}' is in-use and cannot be used."
541 )
542
543 return await self._execute(msg, assign_id=assign_id)
544
e0fea0b3
JS
545 @upper_half
546 @require(Runstate.RUNNING)
547 async def execute_msg(self, msg: Message) -> object:
548 """
549 Execute a QMP command and return its value.
550
551 :param msg: The QMP `Message` to execute.
552
553 :return:
554 The command execution return value from the server. The type of
555 object returned depends on the command that was issued,
556 though most in QEMU return a `dict`.
557 :raise ValueError:
558 If the QMP `Message` does not have either the 'execute' or
559 'exec-oob' fields set.
560 :raise ExecuteError: When the server returns an error response.
561 :raise ExecInterruptedError: if the connection was terminated early.
562 """
563 if not ('execute' in msg or 'exec-oob' in msg):
564 raise ValueError("Requires 'execute' or 'exec-oob' message")
565
566 # Copy the Message so that the ID assigned by _execute() is
567 # local to this method; allowing the ID to be seen in raised
568 # Exceptions but without modifying the caller's held copy.
569 msg = Message(msg)
570 reply = await self._execute(msg)
571
572 if 'error' in reply:
573 try:
574 error_response = ErrorResponse(reply)
575 except (KeyError, TypeError) as err:
576 # Error response was malformed.
577 raise BadReplyError(
578 "QMP error reply is malformed", reply, msg,
579 ) from err
580
581 raise ExecuteError(error_response, msg, reply)
582
583 if 'return' not in reply:
584 raise BadReplyError(
585 "QMP reply is missing a 'error' or 'return' member",
586 reply, msg,
587 )
588
589 return reply['return']
590
c67d696b
JS
591 @classmethod
592 def make_execute_msg(cls, cmd: str,
593 arguments: Optional[Mapping[str, object]] = None,
594 oob: bool = False) -> Message:
595 """
e0fea0b3 596 Create an executable message to be sent by `execute_msg` later.
c67d696b
JS
597
598 :param cmd: QMP command name.
599 :param arguments: Arguments (if any). Must be JSON-serializable.
600 :param oob: If `True`, execute "out of band".
601
602 :return: An executable QMP `Message`.
603 """
604 msg = Message({'exec-oob' if oob else 'execute': cmd})
605 if arguments is not None:
606 msg['arguments'] = arguments
607 return msg
e0fea0b3
JS
608
609 @upper_half
610 async def execute(self, cmd: str,
611 arguments: Optional[Mapping[str, object]] = None,
612 oob: bool = False) -> object:
613 """
614 Execute a QMP command and return its value.
615
616 :param cmd: QMP command name.
617 :param arguments: Arguments (if any). Must be JSON-serializable.
618 :param oob: If `True`, execute "out of band".
619
620 :return:
621 The command execution return value from the server. The type of
622 object returned depends on the command that was issued,
623 though most in QEMU return a `dict`.
624 :raise ExecuteError: When the server returns an error response.
625 :raise ExecInterruptedError: if the connection was terminated early.
626 """
627 msg = self.make_execute_msg(cmd, arguments, oob=oob)
628 return await self.execute_msg(msg)
6e2f6ec5
JS
629
630 @upper_half
631 @require(Runstate.RUNNING)
632 def send_fd_scm(self, fd: int) -> None:
633 """
634 Send a file descriptor to the remote via SCM_RIGHTS.
635 """
636 assert self._writer is not None
637 sock = self._writer.transport.get_extra_info('socket')
638
639 if sock.family != socket.AF_UNIX:
640 raise AQMPError("Sending file descriptors requires a UNIX socket.")
641
642 # Void the warranty sticker.
643 # Access to sendmsg in asyncio is scheduled for removal in Python 3.11.
644 sock = sock._sock # pylint: disable=protected-access
645 sock.sendmsg(
646 [b' '],
647 [(socket.SOL_SOCKET, socket.SCM_RIGHTS, struct.pack('@i', fd))]
648 )