]> git.proxmox.com Git - mirror_qemu.git/blob - python/qemu/aqmp/qmp_client.py
dtc: Update to version 1.6.1
[mirror_qemu.git] / python / qemu / aqmp / qmp_client.py
1 """
2 QMP Protocol Implementation
3
4 This module provides the `QMPClient` class, which can be used to connect
5 and send commands to a QMP server such as QEMU. The QMP class can be
6 used to either connect to a listening server, or used to listen and
7 accept an incoming connection from that server.
8 """
9
10 import asyncio
11 import logging
12 from typing import (
13 Dict,
14 List,
15 Mapping,
16 Optional,
17 Union,
18 cast,
19 )
20
21 from .error import AQMPError, ProtocolError
22 from .events import Events
23 from .message import Message
24 from .models import ErrorResponse, Greeting
25 from .protocol import AsyncProtocol, Runstate, require
26 from .util import (
27 bottom_half,
28 exception_summary,
29 pretty_traceback,
30 upper_half,
31 )
32
33
34 class _WrappedProtocolError(ProtocolError):
35 """
36 Abstract exception class for Protocol errors that wrap an Exception.
37
38 :param error_message: Human-readable string describing the error.
39 :param exc: The root-cause exception.
40 """
41 def __init__(self, error_message: str, exc: Exception):
42 super().__init__(error_message)
43 self.exc = exc
44
45 def __str__(self) -> str:
46 return f"{self.error_message}: {self.exc!s}"
47
48
49 class GreetingError(_WrappedProtocolError):
50 """
51 An exception occurred during the Greeting phase.
52
53 :param error_message: Human-readable string describing the error.
54 :param exc: The root-cause exception.
55 """
56
57
58 class NegotiationError(_WrappedProtocolError):
59 """
60 An exception occurred during the Negotiation phase.
61
62 :param error_message: Human-readable string describing the error.
63 :param exc: The root-cause exception.
64 """
65
66
67 class ExecuteError(AQMPError):
68 """
69 Exception raised by `QMPClient.execute()` on RPC failure.
70
71 :param error_response: The RPC error response object.
72 :param sent: The sent RPC message that caused the failure.
73 :param received: The raw RPC error reply received.
74 """
75 def __init__(self, error_response: ErrorResponse,
76 sent: Message, received: Message):
77 super().__init__(error_response.error.desc)
78 #: The sent `Message` that caused the failure
79 self.sent: Message = sent
80 #: The received `Message` that indicated failure
81 self.received: Message = received
82 #: The parsed error response
83 self.error: ErrorResponse = error_response
84 #: The QMP error class
85 self.error_class: str = error_response.error.class_
86
87
88 class ExecInterruptedError(AQMPError):
89 """
90 Exception raised by `execute()` (et al) when an RPC is interrupted.
91
92 This error is raised when an `execute()` statement could not be
93 completed. This can occur because the connection itself was
94 terminated before a reply was received.
95
96 The true cause of the interruption will be available via `disconnect()`.
97 """
98
99
100 class _MsgProtocolError(ProtocolError):
101 """
102 Abstract error class for protocol errors that have a `Message` object.
103
104 This Exception class is used for protocol errors where the `Message`
105 was mechanically understood, but was found to be inappropriate or
106 malformed.
107
108 :param error_message: Human-readable string describing the error.
109 :param msg: The QMP `Message` that caused the error.
110 """
111 def __init__(self, error_message: str, msg: Message):
112 super().__init__(error_message)
113 #: The received `Message` that caused the error.
114 self.msg: Message = msg
115
116 def __str__(self) -> str:
117 return "\n".join([
118 super().__str__(),
119 f" Message was: {str(self.msg)}\n",
120 ])
121
122
123 class ServerParseError(_MsgProtocolError):
124 """
125 The Server sent a `Message` indicating parsing failure.
126
127 i.e. A reply has arrived from the server, but it is missing the "ID"
128 field, indicating a parsing error.
129
130 :param error_message: Human-readable string describing the error.
131 :param msg: The QMP `Message` that caused the error.
132 """
133
134
135 class BadReplyError(_MsgProtocolError):
136 """
137 An execution reply was successfully routed, but not understood.
138
139 If a QMP message is received with an 'id' field to allow it to be
140 routed, but is otherwise malformed, this exception will be raised.
141
142 A reply message is malformed if it is missing either the 'return' or
143 'error' keys, or if the 'error' value has missing keys or members of
144 the wrong type.
145
146 :param error_message: Human-readable string describing the error.
147 :param msg: The malformed reply that was received.
148 :param sent: The message that was sent that prompted the error.
149 """
150 def __init__(self, error_message: str, msg: Message, sent: Message):
151 super().__init__(error_message, msg)
152 #: The sent `Message` that caused the failure
153 self.sent = sent
154
155
156 class QMPClient(AsyncProtocol[Message], Events):
157 """
158 Implements a QMP client connection.
159
160 QMP can be used to establish a connection as either the transport
161 client or server, though this class always acts as the QMP client.
162
163 :param name: Optional nickname for the connection, used for logging.
164
165 Basic script-style usage looks like this::
166
167 qmp = QMPClient('my_virtual_machine_name')
168 await qmp.connect(('127.0.0.1', 1234))
169 ...
170 res = await qmp.execute('block-query')
171 ...
172 await qmp.disconnect()
173
174 Basic async client-style usage looks like this::
175
176 class Client:
177 def __init__(self, name: str):
178 self.qmp = QMPClient(name)
179
180 async def watch_events(self):
181 try:
182 async for event in self.qmp.events:
183 print(f"Event: {event['event']}")
184 except asyncio.CancelledError:
185 return
186
187 async def run(self, address='/tmp/qemu.socket'):
188 await self.qmp.connect(address)
189 asyncio.create_task(self.watch_events())
190 await self.qmp.runstate_changed.wait()
191 await self.disconnect()
192
193 See `aqmp.events` for more detail on event handling patterns.
194 """
195 #: Logger object used for debugging messages.
196 logger = logging.getLogger(__name__)
197
198 # Read buffer limit; large enough to accept query-qmp-schema
199 _limit = (256 * 1024)
200
201 # Type alias for pending execute() result items
202 _PendingT = Union[Message, ExecInterruptedError]
203
204 def __init__(self, name: Optional[str] = None) -> None:
205 super().__init__(name)
206 Events.__init__(self)
207
208 #: Whether or not to await a greeting after establishing a connection.
209 self.await_greeting: bool = True
210
211 #: Whether or not to perform capabilities negotiation upon connection.
212 #: Implies `await_greeting`.
213 self.negotiate: bool = True
214
215 # Cached Greeting, if one was awaited.
216 self._greeting: Optional[Greeting] = None
217
218 # Command ID counter
219 self._execute_id = 0
220
221 # Incoming RPC reply messages.
222 self._pending: Dict[
223 Union[str, None],
224 'asyncio.Queue[QMPClient._PendingT]'
225 ] = {}
226
227 @upper_half
228 async def _establish_session(self) -> None:
229 """
230 Initiate the QMP session.
231
232 Wait for the QMP greeting and perform capabilities negotiation.
233
234 :raise GreetingError: When the greeting is not understood.
235 :raise NegotiationError: If the negotiation fails.
236 :raise EOFError: When the server unexpectedly hangs up.
237 :raise OSError: For underlying stream errors.
238 """
239 self._greeting = None
240 self._pending = {}
241
242 if self.await_greeting or self.negotiate:
243 self._greeting = await self._get_greeting()
244
245 if self.negotiate:
246 await self._negotiate()
247
248 # This will start the reader/writers:
249 await super()._establish_session()
250
251 @upper_half
252 async def _get_greeting(self) -> Greeting:
253 """
254 :raise GreetingError: When the greeting is not understood.
255 :raise EOFError: When the server unexpectedly hangs up.
256 :raise OSError: For underlying stream errors.
257
258 :return: the Greeting object given by the server.
259 """
260 self.logger.debug("Awaiting greeting ...")
261
262 try:
263 msg = await self._recv()
264 return Greeting(msg)
265 except (ProtocolError, KeyError, TypeError) as err:
266 emsg = "Did not understand Greeting"
267 self.logger.error("%s: %s", emsg, exception_summary(err))
268 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
269 raise GreetingError(emsg, err) from err
270 except BaseException as err:
271 # EOFError, OSError, or something unexpected.
272 emsg = "Failed to receive Greeting"
273 self.logger.error("%s: %s", emsg, exception_summary(err))
274 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
275 raise
276
277 @upper_half
278 async def _negotiate(self) -> None:
279 """
280 Perform QMP capabilities negotiation.
281
282 :raise NegotiationError: When negotiation fails.
283 :raise EOFError: When the server unexpectedly hangs up.
284 :raise OSError: For underlying stream errors.
285 """
286 self.logger.debug("Negotiating capabilities ...")
287
288 arguments: Dict[str, List[str]] = {'enable': []}
289 if self._greeting and 'oob' in self._greeting.QMP.capabilities:
290 arguments['enable'].append('oob')
291 msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
292
293 # It's not safe to use execute() here, because the reader/writers
294 # aren't running. AsyncProtocol *requires* that a new session
295 # does not fail after the reader/writers are running!
296 try:
297 await self._send(msg)
298 reply = await self._recv()
299 assert 'return' in reply
300 assert 'error' not in reply
301 except (ProtocolError, AssertionError) as err:
302 emsg = "Negotiation failed"
303 self.logger.error("%s: %s", emsg, exception_summary(err))
304 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
305 raise NegotiationError(emsg, err) from err
306 except BaseException as err:
307 # EOFError, OSError, or something unexpected.
308 emsg = "Negotiation failed"
309 self.logger.error("%s: %s", emsg, exception_summary(err))
310 self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
311 raise
312
313 @bottom_half
314 async def _bh_disconnect(self) -> None:
315 try:
316 await super()._bh_disconnect()
317 finally:
318 if self._pending:
319 self.logger.debug("Cancelling pending executions")
320 keys = self._pending.keys()
321 for key in keys:
322 self.logger.debug("Cancelling execution '%s'", key)
323 self._pending[key].put_nowait(
324 ExecInterruptedError("Disconnected")
325 )
326
327 self.logger.debug("QMP Disconnected.")
328
329 @upper_half
330 def _cleanup(self) -> None:
331 super()._cleanup()
332 assert not self._pending
333
334 @bottom_half
335 async def _on_message(self, msg: Message) -> None:
336 """
337 Add an incoming message to the appropriate queue/handler.
338
339 :raise ServerParseError: When Message indicates server parse failure.
340 """
341 # Incoming messages are not fully parsed/validated here;
342 # do only light peeking to know how to route the messages.
343
344 if 'event' in msg:
345 await self._event_dispatch(msg)
346 return
347
348 # Below, we assume everything left is an execute/exec-oob response.
349
350 exec_id = cast(Optional[str], msg.get('id'))
351
352 if exec_id in self._pending:
353 await self._pending[exec_id].put(msg)
354 return
355
356 # We have a message we can't route back to a caller.
357
358 is_error = 'error' in msg
359 has_id = 'id' in msg
360
361 if is_error and not has_id:
362 # This is very likely a server parsing error.
363 # It doesn't inherently belong to any pending execution.
364 # Instead of performing clever recovery, just terminate.
365 # See "NOTE" in qmp-spec.txt, section 2.4.2
366 raise ServerParseError(
367 ("Server sent an error response without an ID, "
368 "but there are no ID-less executions pending. "
369 "Assuming this is a server parser failure."),
370 msg
371 )
372
373 # qmp-spec.txt, section 2.4:
374 # 'Clients should drop all the responses
375 # that have an unknown "id" field.'
376 self.logger.log(
377 logging.ERROR if is_error else logging.WARNING,
378 "Unknown ID '%s', message dropped.",
379 exec_id,
380 )
381 self.logger.debug("Unroutable message: %s", str(msg))
382
383 @upper_half
384 @bottom_half
385 async def _do_recv(self) -> Message:
386 """
387 :raise OSError: When a stream error is encountered.
388 :raise EOFError: When the stream is at EOF.
389 :raise ProtocolError:
390 When the Message is not understood.
391 See also `Message._deserialize`.
392
393 :return: A single QMP `Message`.
394 """
395 msg_bytes = await self._readline()
396 msg = Message(msg_bytes, eager=True)
397 return msg
398
399 @upper_half
400 @bottom_half
401 def _do_send(self, msg: Message) -> None:
402 """
403 :raise ValueError: JSON serialization failure
404 :raise TypeError: JSON serialization failure
405 :raise OSError: When a stream error is encountered.
406 """
407 assert self._writer is not None
408 self._writer.write(bytes(msg))
409
410 @upper_half
411 def _get_exec_id(self) -> str:
412 exec_id = f"__aqmp#{self._execute_id:05d}"
413 self._execute_id += 1
414 return exec_id
415
416 @upper_half
417 async def _issue(self, msg: Message) -> Union[None, str]:
418 """
419 Issue a QMP `Message` and do not wait for a reply.
420
421 :param msg: The QMP `Message` to send to the server.
422
423 :return: The ID of the `Message` sent.
424 """
425 msg_id: Optional[str] = None
426 if 'id' in msg:
427 assert isinstance(msg['id'], str)
428 msg_id = msg['id']
429
430 self._pending[msg_id] = asyncio.Queue(maxsize=1)
431 await self._outgoing.put(msg)
432
433 return msg_id
434
435 @upper_half
436 async def _reply(self, msg_id: Union[str, None]) -> Message:
437 """
438 Await a reply to a previously issued QMP message.
439
440 :param msg_id: The ID of the previously issued message.
441
442 :return: The reply from the server.
443 :raise ExecInterruptedError:
444 When the reply could not be retrieved because the connection
445 was lost, or some other problem.
446 """
447 queue = self._pending[msg_id]
448 result = await queue.get()
449
450 try:
451 if isinstance(result, ExecInterruptedError):
452 raise result
453 return result
454 finally:
455 del self._pending[msg_id]
456
457 @upper_half
458 async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
459 """
460 Send a QMP `Message` to the server and await a reply.
461
462 This method *assumes* you are sending some kind of an execute
463 statement that *will* receive a reply.
464
465 An execution ID will be assigned if assign_id is `True`. It can be
466 disabled, but this requires that an ID is manually assigned
467 instead. For manually assigned IDs, you must not use the string
468 '__aqmp#' anywhere in the ID.
469
470 :param msg: The QMP `Message` to execute.
471 :param assign_id: If True, assign a new execution ID.
472
473 :return: Execution reply from the server.
474 :raise ExecInterruptedError:
475 When the reply could not be retrieved because the connection
476 was lost, or some other problem.
477 """
478 if assign_id:
479 msg['id'] = self._get_exec_id()
480 elif 'id' in msg:
481 assert isinstance(msg['id'], str)
482 assert '__aqmp#' not in msg['id']
483
484 exec_id = await self._issue(msg)
485 return await self._reply(exec_id)
486
487 @upper_half
488 @require(Runstate.RUNNING)
489 async def _raw(
490 self,
491 msg: Union[Message, Mapping[str, object], bytes],
492 assign_id: bool = True,
493 ) -> Message:
494 """
495 Issue a raw `Message` to the QMP server and await a reply.
496
497 :param msg:
498 A Message to send to the server. It may be a `Message`, any
499 Mapping (including Dict), or raw bytes.
500 :param assign_id:
501 Assign an arbitrary execution ID to this message. If
502 `False`, the existing id must either be absent (and no other
503 such pending execution may omit an ID) or a string. If it is
504 a string, it must not start with '__aqmp#' and no other such
505 pending execution may currently be using that ID.
506
507 :return: Execution reply from the server.
508
509 :raise ExecInterruptedError:
510 When the reply could not be retrieved because the connection
511 was lost, or some other problem.
512 :raise TypeError:
513 When assign_id is `False`, an ID is given, and it is not a string.
514 :raise ValueError:
515 When assign_id is `False`, but the ID is not usable;
516 Either because it starts with '__aqmp#' or it is already in-use.
517 """
518 # 1. convert generic Mapping or bytes to a QMP Message
519 # 2. copy Message objects so that we assign an ID only to the copy.
520 msg = Message(msg)
521
522 exec_id = msg.get('id')
523 if not assign_id and 'id' in msg:
524 if not isinstance(exec_id, str):
525 raise TypeError(f"ID ('{exec_id}') must be a string.")
526 if exec_id.startswith('__aqmp#'):
527 raise ValueError(
528 f"ID ('{exec_id}') must not start with '__aqmp#'."
529 )
530
531 if not assign_id and exec_id in self._pending:
532 raise ValueError(
533 f"ID '{exec_id}' is in-use and cannot be used."
534 )
535
536 return await self._execute(msg, assign_id=assign_id)
537
538 @upper_half
539 @require(Runstate.RUNNING)
540 async def execute_msg(self, msg: Message) -> object:
541 """
542 Execute a QMP command and return its value.
543
544 :param msg: The QMP `Message` to execute.
545
546 :return:
547 The command execution return value from the server. The type of
548 object returned depends on the command that was issued,
549 though most in QEMU return a `dict`.
550 :raise ValueError:
551 If the QMP `Message` does not have either the 'execute' or
552 'exec-oob' fields set.
553 :raise ExecuteError: When the server returns an error response.
554 :raise ExecInterruptedError: if the connection was terminated early.
555 """
556 if not ('execute' in msg or 'exec-oob' in msg):
557 raise ValueError("Requires 'execute' or 'exec-oob' message")
558
559 # Copy the Message so that the ID assigned by _execute() is
560 # local to this method; allowing the ID to be seen in raised
561 # Exceptions but without modifying the caller's held copy.
562 msg = Message(msg)
563 reply = await self._execute(msg)
564
565 if 'error' in reply:
566 try:
567 error_response = ErrorResponse(reply)
568 except (KeyError, TypeError) as err:
569 # Error response was malformed.
570 raise BadReplyError(
571 "QMP error reply is malformed", reply, msg,
572 ) from err
573
574 raise ExecuteError(error_response, msg, reply)
575
576 if 'return' not in reply:
577 raise BadReplyError(
578 "QMP reply is missing a 'error' or 'return' member",
579 reply, msg,
580 )
581
582 return reply['return']
583
584 @classmethod
585 def make_execute_msg(cls, cmd: str,
586 arguments: Optional[Mapping[str, object]] = None,
587 oob: bool = False) -> Message:
588 """
589 Create an executable message to be sent by `execute_msg` later.
590
591 :param cmd: QMP command name.
592 :param arguments: Arguments (if any). Must be JSON-serializable.
593 :param oob: If `True`, execute "out of band".
594
595 :return: An executable QMP `Message`.
596 """
597 msg = Message({'exec-oob' if oob else 'execute': cmd})
598 if arguments is not None:
599 msg['arguments'] = arguments
600 return msg
601
602 @upper_half
603 async def execute(self, cmd: str,
604 arguments: Optional[Mapping[str, object]] = None,
605 oob: bool = False) -> object:
606 """
607 Execute a QMP command and return its value.
608
609 :param cmd: QMP command name.
610 :param arguments: Arguments (if any). Must be JSON-serializable.
611 :param oob: If `True`, execute "out of band".
612
613 :return:
614 The command execution return value from the server. The type of
615 object returned depends on the command that was issued,
616 though most in QEMU return a `dict`.
617 :raise ExecuteError: When the server returns an error response.
618 :raise ExecInterruptedError: if the connection was terminated early.
619 """
620 msg = self.make_execute_msg(cmd, arguments, oob=oob)
621 return await self.execute_msg(msg)