]>
git.proxmox.com Git - mirror_qemu.git/blob - python/qemu/aqmp/qmp_client.py
2 QMP Protocol Implementation
4 This module provides the `QMPClient` class, which can be used to connect
5 and send commands to a QMP server such as QEMU. The QMP class can be
6 used to either connect to a listening server, or used to listen and
7 accept an incoming connection from that server.
21 from .error
import AQMPError
, ProtocolError
22 from .events
import Events
23 from .message
import Message
24 from .models
import ErrorResponse
, Greeting
25 from .protocol
import AsyncProtocol
, Runstate
, require
34 class _WrappedProtocolError(ProtocolError
):
36 Abstract exception class for Protocol errors that wrap an Exception.
38 :param error_message: Human-readable string describing the error.
39 :param exc: The root-cause exception.
41 def __init__(self
, error_message
: str, exc
: Exception):
42 super().__init
__(error_message
)
45 def __str__(self
) -> str:
46 return f
"{self.error_message}: {self.exc!s}"
49 class GreetingError(_WrappedProtocolError
):
51 An exception occurred during the Greeting phase.
53 :param error_message: Human-readable string describing the error.
54 :param exc: The root-cause exception.
58 class NegotiationError(_WrappedProtocolError
):
60 An exception occurred during the Negotiation phase.
62 :param error_message: Human-readable string describing the error.
63 :param exc: The root-cause exception.
67 class ExecuteError(AQMPError
):
69 Exception raised by `QMPClient.execute()` on RPC failure.
71 :param error_response: The RPC error response object.
72 :param sent: The sent RPC message that caused the failure.
73 :param received: The raw RPC error reply received.
75 def __init__(self
, error_response
: ErrorResponse
,
76 sent
: Message
, received
: Message
):
77 super().__init
__(error_response
.error
.desc
)
78 #: The sent `Message` that caused the failure
79 self
.sent
: Message
= sent
80 #: The received `Message` that indicated failure
81 self
.received
: Message
= received
82 #: The parsed error response
83 self
.error
: ErrorResponse
= error_response
84 #: The QMP error class
85 self
.error_class
: str = error_response
.error
.class_
88 class ExecInterruptedError(AQMPError
):
90 Exception raised by `execute()` (et al) when an RPC is interrupted.
92 This error is raised when an `execute()` statement could not be
93 completed. This can occur because the connection itself was
94 terminated before a reply was received.
96 The true cause of the interruption will be available via `disconnect()`.
100 class _MsgProtocolError(ProtocolError
):
102 Abstract error class for protocol errors that have a `Message` object.
104 This Exception class is used for protocol errors where the `Message`
105 was mechanically understood, but was found to be inappropriate or
108 :param error_message: Human-readable string describing the error.
109 :param msg: The QMP `Message` that caused the error.
111 def __init__(self
, error_message
: str, msg
: Message
):
112 super().__init
__(error_message
)
113 #: The received `Message` that caused the error.
114 self
.msg
: Message
= msg
116 def __str__(self
) -> str:
119 f
" Message was: {str(self.msg)}\n",
123 class ServerParseError(_MsgProtocolError
):
125 The Server sent a `Message` indicating parsing failure.
127 i.e. A reply has arrived from the server, but it is missing the "ID"
128 field, indicating a parsing error.
130 :param error_message: Human-readable string describing the error.
131 :param msg: The QMP `Message` that caused the error.
135 class BadReplyError(_MsgProtocolError
):
137 An execution reply was successfully routed, but not understood.
139 If a QMP message is received with an 'id' field to allow it to be
140 routed, but is otherwise malformed, this exception will be raised.
142 A reply message is malformed if it is missing either the 'return' or
143 'error' keys, or if the 'error' value has missing keys or members of
146 :param error_message: Human-readable string describing the error.
147 :param msg: The malformed reply that was received.
148 :param sent: The message that was sent that prompted the error.
150 def __init__(self
, error_message
: str, msg
: Message
, sent
: Message
):
151 super().__init
__(error_message
, msg
)
152 #: The sent `Message` that caused the failure
156 class QMPClient(AsyncProtocol
[Message
], Events
):
158 Implements a QMP client connection.
160 QMP can be used to establish a connection as either the transport
161 client or server, though this class always acts as the QMP client.
163 :param name: Optional nickname for the connection, used for logging.
165 Basic script-style usage looks like this::
167 qmp = QMPClient('my_virtual_machine_name')
168 await qmp.connect(('127.0.0.1', 1234))
170 res = await qmp.execute('block-query')
172 await qmp.disconnect()
174 Basic async client-style usage looks like this::
177 def __init__(self, name: str):
178 self.qmp = QMPClient(name)
180 async def watch_events(self):
182 async for event in self.qmp.events:
183 print(f"Event: {event['event']}")
184 except asyncio.CancelledError:
187 async def run(self, address='/tmp/qemu.socket'):
188 await self.qmp.connect(address)
189 asyncio.create_task(self.watch_events())
190 await self.qmp.runstate_changed.wait()
191 await self.disconnect()
193 See `aqmp.events` for more detail on event handling patterns.
195 #: Logger object used for debugging messages.
196 logger
= logging
.getLogger(__name__
)
198 # Read buffer limit; large enough to accept query-qmp-schema
199 _limit
= (256 * 1024)
201 # Type alias for pending execute() result items
202 _PendingT
= Union
[Message
, ExecInterruptedError
]
204 def __init__(self
, name
: Optional
[str] = None) -> None:
205 super().__init
__(name
)
206 Events
.__init
__(self
)
208 #: Whether or not to await a greeting after establishing a connection.
209 self
.await_greeting
: bool = True
211 #: Whether or not to perform capabilities negotiation upon connection.
212 #: Implies `await_greeting`.
213 self
.negotiate
: bool = True
215 # Cached Greeting, if one was awaited.
216 self
._greeting
: Optional
[Greeting
] = None
221 # Incoming RPC reply messages.
224 'asyncio.Queue[QMPClient._PendingT]'
228 async def _establish_session(self
) -> None:
230 Initiate the QMP session.
232 Wait for the QMP greeting and perform capabilities negotiation.
234 :raise GreetingError: When the greeting is not understood.
235 :raise NegotiationError: If the negotiation fails.
236 :raise EOFError: When the server unexpectedly hangs up.
237 :raise OSError: For underlying stream errors.
239 self
._greeting
= None
242 if self
.await_greeting
or self
.negotiate
:
243 self
._greeting
= await self
._get
_greeting
()
246 await self
._negotiate
()
248 # This will start the reader/writers:
249 await super()._establish
_session
()
252 async def _get_greeting(self
) -> Greeting
:
254 :raise GreetingError: When the greeting is not understood.
255 :raise EOFError: When the server unexpectedly hangs up.
256 :raise OSError: For underlying stream errors.
258 :return: the Greeting object given by the server.
260 self
.logger
.debug("Awaiting greeting ...")
263 msg
= await self
._recv
()
265 except (ProtocolError
, KeyError, TypeError) as err
:
266 emsg
= "Did not understand Greeting"
267 self
.logger
.error("%s: %s", emsg
, exception_summary(err
))
268 self
.logger
.debug("%s:\n%s\n", emsg
, pretty_traceback())
269 raise GreetingError(emsg
, err
) from err
270 except BaseException
as err
:
271 # EOFError, OSError, or something unexpected.
272 emsg
= "Failed to receive Greeting"
273 self
.logger
.error("%s: %s", emsg
, exception_summary(err
))
274 self
.logger
.debug("%s:\n%s\n", emsg
, pretty_traceback())
278 async def _negotiate(self
) -> None:
280 Perform QMP capabilities negotiation.
282 :raise NegotiationError: When negotiation fails.
283 :raise EOFError: When the server unexpectedly hangs up.
284 :raise OSError: For underlying stream errors.
286 self
.logger
.debug("Negotiating capabilities ...")
288 arguments
: Dict
[str, List
[str]] = {'enable': []}
289 if self
._greeting
and 'oob' in self
._greeting
.QMP
.capabilities
:
290 arguments
['enable'].append('oob')
291 msg
= self
.make_execute_msg('qmp_capabilities', arguments
=arguments
)
293 # It's not safe to use execute() here, because the reader/writers
294 # aren't running. AsyncProtocol *requires* that a new session
295 # does not fail after the reader/writers are running!
297 await self
._send
(msg
)
298 reply
= await self
._recv
()
299 assert 'return' in reply
300 assert 'error' not in reply
301 except (ProtocolError
, AssertionError) as err
:
302 emsg
= "Negotiation failed"
303 self
.logger
.error("%s: %s", emsg
, exception_summary(err
))
304 self
.logger
.debug("%s:\n%s\n", emsg
, pretty_traceback())
305 raise NegotiationError(emsg
, err
) from err
306 except BaseException
as err
:
307 # EOFError, OSError, or something unexpected.
308 emsg
= "Negotiation failed"
309 self
.logger
.error("%s: %s", emsg
, exception_summary(err
))
310 self
.logger
.debug("%s:\n%s\n", emsg
, pretty_traceback())
314 async def _bh_disconnect(self
) -> None:
316 await super()._bh
_disconnect
()
319 self
.logger
.debug("Cancelling pending executions")
320 keys
= self
._pending
.keys()
322 self
.logger
.debug("Cancelling execution '%s'", key
)
323 self
._pending
[key
].put_nowait(
324 ExecInterruptedError("Disconnected")
327 self
.logger
.debug("QMP Disconnected.")
330 def _cleanup(self
) -> None:
332 assert not self
._pending
335 async def _on_message(self
, msg
: Message
) -> None:
337 Add an incoming message to the appropriate queue/handler.
339 :raise ServerParseError: When Message indicates server parse failure.
341 # Incoming messages are not fully parsed/validated here;
342 # do only light peeking to know how to route the messages.
345 await self
._event
_dispatch
(msg
)
348 # Below, we assume everything left is an execute/exec-oob response.
350 exec_id
= cast(Optional
[str], msg
.get('id'))
352 if exec_id
in self
._pending
:
353 await self
._pending
[exec_id
].put(msg
)
356 # We have a message we can't route back to a caller.
358 is_error
= 'error' in msg
361 if is_error
and not has_id
:
362 # This is very likely a server parsing error.
363 # It doesn't inherently belong to any pending execution.
364 # Instead of performing clever recovery, just terminate.
365 # See "NOTE" in qmp-spec.txt, section 2.4.2
366 raise ServerParseError(
367 ("Server sent an error response without an ID, "
368 "but there are no ID-less executions pending. "
369 "Assuming this is a server parser failure."),
373 # qmp-spec.txt, section 2.4:
374 # 'Clients should drop all the responses
375 # that have an unknown "id" field.'
377 logging
.ERROR
if is_error
else logging
.WARNING
,
378 "Unknown ID '%s', message dropped.",
381 self
.logger
.debug("Unroutable message: %s", str(msg
))
385 async def _do_recv(self
) -> Message
:
387 :raise OSError: When a stream error is encountered.
388 :raise EOFError: When the stream is at EOF.
389 :raise ProtocolError:
390 When the Message is not understood.
391 See also `Message._deserialize`.
393 :return: A single QMP `Message`.
395 msg_bytes
= await self
._readline
()
396 msg
= Message(msg_bytes
, eager
=True)
401 def _do_send(self
, msg
: Message
) -> None:
403 :raise ValueError: JSON serialization failure
404 :raise TypeError: JSON serialization failure
405 :raise OSError: When a stream error is encountered.
407 assert self
._writer
is not None
408 self
._writer
.write(bytes(msg
))
411 def _get_exec_id(self
) -> str:
412 exec_id
= f
"__aqmp#{self._execute_id:05d}"
413 self
._execute
_id
+= 1
417 async def _issue(self
, msg
: Message
) -> Union
[None, str]:
419 Issue a QMP `Message` and do not wait for a reply.
421 :param msg: The QMP `Message` to send to the server.
423 :return: The ID of the `Message` sent.
425 msg_id
: Optional
[str] = None
427 assert isinstance(msg
['id'], str)
430 self
._pending
[msg_id
] = asyncio
.Queue(maxsize
=1)
431 await self
._outgoing
.put(msg
)
436 async def _reply(self
, msg_id
: Union
[str, None]) -> Message
:
438 Await a reply to a previously issued QMP message.
440 :param msg_id: The ID of the previously issued message.
442 :return: The reply from the server.
443 :raise ExecInterruptedError:
444 When the reply could not be retrieved because the connection
445 was lost, or some other problem.
447 queue
= self
._pending
[msg_id
]
448 result
= await queue
.get()
451 if isinstance(result
, ExecInterruptedError
):
455 del self
._pending
[msg_id
]
458 async def _execute(self
, msg
: Message
, assign_id
: bool = True) -> Message
:
460 Send a QMP `Message` to the server and await a reply.
462 This method *assumes* you are sending some kind of an execute
463 statement that *will* receive a reply.
465 An execution ID will be assigned if assign_id is `True`. It can be
466 disabled, but this requires that an ID is manually assigned
467 instead. For manually assigned IDs, you must not use the string
468 '__aqmp#' anywhere in the ID.
470 :param msg: The QMP `Message` to execute.
471 :param assign_id: If True, assign a new execution ID.
473 :return: Execution reply from the server.
474 :raise ExecInterruptedError:
475 When the reply could not be retrieved because the connection
476 was lost, or some other problem.
479 msg
['id'] = self
._get
_exec
_id
()
481 assert isinstance(msg
['id'], str)
482 assert '__aqmp#' not in msg
['id']
484 exec_id
= await self
._issue
(msg
)
485 return await self
._reply
(exec_id
)
488 @require(Runstate
.RUNNING
)
491 msg
: Union
[Message
, Mapping
[str, object], bytes
],
492 assign_id
: bool = True,
495 Issue a raw `Message` to the QMP server and await a reply.
498 A Message to send to the server. It may be a `Message`, any
499 Mapping (including Dict), or raw bytes.
501 Assign an arbitrary execution ID to this message. If
502 `False`, the existing id must either be absent (and no other
503 such pending execution may omit an ID) or a string. If it is
504 a string, it must not start with '__aqmp#' and no other such
505 pending execution may currently be using that ID.
507 :return: Execution reply from the server.
509 :raise ExecInterruptedError:
510 When the reply could not be retrieved because the connection
511 was lost, or some other problem.
513 When assign_id is `False`, an ID is given, and it is not a string.
515 When assign_id is `False`, but the ID is not usable;
516 Either because it starts with '__aqmp#' or it is already in-use.
518 # 1. convert generic Mapping or bytes to a QMP Message
519 # 2. copy Message objects so that we assign an ID only to the copy.
522 exec_id
= msg
.get('id')
523 if not assign_id
and 'id' in msg
:
524 if not isinstance(exec_id
, str):
525 raise TypeError(f
"ID ('{exec_id}') must be a string.")
526 if exec_id
.startswith('__aqmp#'):
528 f
"ID ('{exec_id}') must not start with '__aqmp#'."
531 if not assign_id
and exec_id
in self
._pending
:
533 f
"ID '{exec_id}' is in-use and cannot be used."
536 return await self
._execute
(msg
, assign_id
=assign_id
)
539 @require(Runstate
.RUNNING
)
540 async def execute_msg(self
, msg
: Message
) -> object:
542 Execute a QMP command and return its value.
544 :param msg: The QMP `Message` to execute.
547 The command execution return value from the server. The type of
548 object returned depends on the command that was issued,
549 though most in QEMU return a `dict`.
551 If the QMP `Message` does not have either the 'execute' or
552 'exec-oob' fields set.
553 :raise ExecuteError: When the server returns an error response.
554 :raise ExecInterruptedError: if the connection was terminated early.
556 if not ('execute' in msg
or 'exec-oob' in msg
):
557 raise ValueError("Requires 'execute' or 'exec-oob' message")
559 # Copy the Message so that the ID assigned by _execute() is
560 # local to this method; allowing the ID to be seen in raised
561 # Exceptions but without modifying the caller's held copy.
563 reply
= await self
._execute
(msg
)
567 error_response
= ErrorResponse(reply
)
568 except (KeyError, TypeError) as err
:
569 # Error response was malformed.
571 "QMP error reply is malformed", reply
, msg
,
574 raise ExecuteError(error_response
, msg
, reply
)
576 if 'return' not in reply
:
578 "QMP reply is missing a 'error' or 'return' member",
582 return reply
['return']
585 def make_execute_msg(cls
, cmd
: str,
586 arguments
: Optional
[Mapping
[str, object]] = None,
587 oob
: bool = False) -> Message
:
589 Create an executable message to be sent by `execute_msg` later.
591 :param cmd: QMP command name.
592 :param arguments: Arguments (if any). Must be JSON-serializable.
593 :param oob: If `True`, execute "out of band".
595 :return: An executable QMP `Message`.
597 msg
= Message({'exec-oob' if oob
else 'execute': cmd
})
598 if arguments
is not None:
599 msg
['arguments'] = arguments
603 async def execute(self
, cmd
: str,
604 arguments
: Optional
[Mapping
[str, object]] = None,
605 oob
: bool = False) -> object:
607 Execute a QMP command and return its value.
609 :param cmd: QMP command name.
610 :param arguments: Arguments (if any). Must be JSON-serializable.
611 :param oob: If `True`, execute "out of band".
614 The command execution return value from the server. The type of
615 object returned depends on the command that was issued,
616 though most in QEMU return a `dict`.
617 :raise ExecuteError: When the server returns an error response.
618 :raise ExecInterruptedError: if the connection was terminated early.
620 msg
= self
.make_execute_msg(cmd
, arguments
, oob
=oob
)
621 return await self
.execute_msg(msg
)