]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rest/app/manager/user_request.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / pybind / mgr / rest / app / manager / user_request.py
1 import json
2 import logging
3 import uuid
4
5 from rest.app.types import OsdMap, PgSummary, USER_REQUEST_COMPLETE, USER_REQUEST_SUBMITTED
6 from rest.app.util import now
7 from mgr_module import CommandResult
8
9 from rest.logger import logger
10 log = logger()
11 from rest.module import global_instance as rest_plugin
12
13
14 class UserRequestBase(object):
15 """
16 A request acts on one or more Ceph-managed objects, i.e.
17 mon, mds, osd, pg.
18
19 Amist the terminology mess of 'jobs', 'commands', 'operations', this class
20 is named for clarity: it's an operation at an end-user level of
21 granularity, something that might be a button in the UI.
22
23 UserRequests are usually remotely executed on a mon. However, there
24 may be a final step of updating the state of ClusterMonitor in order
25 that subsequent REST API consumer reads return values consistent with
26 the job having completed, e.g. waiting for the OSD map to be up
27 to date before calling a pool creation complete. For this reason,
28 UserRequests have a local ID and completion state that is independent
29 of their remote ID (salt jid). UserRequests may also execute more than
30 one JID in the course of their lifetime.
31
32 Requests have the following lifecycle:
33 NEW object is created, it has all the information needed to do its job
34 other than where it should execute.
35 SUBMITTED the request has started executing, usually this will have involved sending
36 out a salt job, so .jid is often set but not always.
37 COMPLETE no further action, this instance will remain constant from this point on.
38 this does not indicate anything about success or failure.
39 """
40
41 NEW = 'new'
42 SUBMITTED = USER_REQUEST_SUBMITTED
43 COMPLETE = USER_REQUEST_COMPLETE
44 states = [NEW, SUBMITTED, COMPLETE]
45
46 def __init__(self):
47 """
48 Requiring cluster_name and fsid is redundant (ideally everything would
49 speak in terms of fsid) but convenient, because the librados interface
50 wants a cluster name when you create a client, and otherwise we would
51 have to look up via ceph.conf.
52 """
53 # getChild isn't in 2.6
54 logname = '.'.join((log.name, self.__class__.__name__))
55 self.log = logging.getLogger(logname)
56 self.requested_at = now()
57 self.completed_at = None
58
59 # This is actually kind of overkill compared with having a counter,
60 # somewhere but it's easy.
61 self.id = uuid.uuid4().__str__()
62
63 self.state = self.NEW
64 self.result = None
65 self.error = False
66 self.error_message = ""
67
68 # Time at which we last believed the current JID to be really running
69 self.alive_at = None
70
71 def set_error(self, message):
72 self.error = True
73 self.error_message = message
74
75 @property
76 def associations(self):
77 """
78 A dictionary of Event-compatible assocations for this request, indicating
79 which cluster/server/services we are affecting.
80 """
81 return {}
82
83 @property
84 def headline(self):
85 """
86 Single line describing what the request is trying to accomplish.
87 """
88 raise NotImplementedError()
89
90 @property
91 def status(self):
92 """
93 Single line describing which phase of the request is currently happening, useful
94 to distinguish what's going on for long running operations. For simple quick
95 operations no need to return anything here as the headline tells all.
96 """
97 if self.state != self.COMPLETE:
98 return "Running"
99 elif self.error:
100 return "Failed (%s)" % self.error_message
101 else:
102 return "Completed successfully"
103
104 @property
105 def awaiting_versions(self):
106 """
107 Requests indicate that they are waiting for particular sync objects, optionally
108 specifying the particular version they are waiting for (otherwise set version
109 to None).
110
111 :return dict of SyncObject subclass to (version or None)
112 """
113 return {}
114
115 def submit(self):
116 """
117 Start remote execution phase by publishing a job to salt.
118 """
119 assert self.state == self.NEW
120
121 self._submit()
122
123 self.state = self.SUBMITTED
124
125 def _submit(self):
126 raise NotImplementedError()
127
128 def complete_jid(self):
129 """
130 Call this when remote execution is done.
131
132 Implementations must always update .jid appropriately here: either to the
133 jid of a new job, or to None.
134 """
135
136 # This is a default behaviour for UserRequests which don't override this method:
137 # assume completion of a JID means the job is now done.
138 self.complete()
139
140 def complete(self):
141 """
142 Call this when you're all done
143 """
144 assert self.state != self.COMPLETE
145
146 self.log.info("Request %s completed with error=%s (%s)" % (self.id, self.error, self.error_message))
147 self.state = self.COMPLETE
148 self.completed_at = now()
149
150 def on_map(self, sync_type, sync_object):
151 """
152 It is only valid to call this for sync_types which are currently in awaiting_versions
153 """
154 pass
155
156
157 class UserRequest(UserRequestBase):
158 def __init__(self, headline):
159 super(UserRequest, self).__init__()
160 self._await_version = None
161 self._headline = headline
162
163 @property
164 def headline(self):
165 return self._headline
166
167
168 class RadosCommands(object):
169 def __init__(self, tag, commands):
170 self.result = None
171 self._tag = tag
172 self._commands = commands
173
174 self.r = None
175 self.outs = None
176 self.outb = None
177
178 def run(self):
179 cmd = self._commands[0]
180 self._commands = self._commands[1:]
181 self.result = CommandResult(self._tag)
182
183 log.debug("cmd={0}".format(cmd))
184
185 # Commands come in as 2-tuple of args and prefix, convert them
186 # to the form that send_command uses
187 command = cmd[1]
188 command['prefix'] = cmd[0]
189
190 rest_plugin().send_command(self.result, json.dumps(command), self._tag)
191
192 def is_complete(self):
193 return self.result is None and not self._commands
194
195 def advance(self):
196 self.r, self.outb, self.outs = self.result.wait()
197 self.result = None
198
199 if self.r == 0:
200 if self._commands:
201 self.run()
202 else:
203 # Stop on errors
204 self._commands = []
205
206
207 class RadosRequest(UserRequest):
208 """
209 A user request whose remote operations consist of librados mon commands
210 """
211 def __init__(self, headline, commands):
212 super(RadosRequest, self).__init__(headline)
213 self.rados_commands = RadosCommands(self.id, commands)
214 self._commands = commands
215
216 def _submit(self, commands=None):
217 if commands is None:
218 commands = self._commands
219 else:
220 commands = commands + [["osd stat", {"format": "json-pretty"}]]
221 self.rados_commands = RadosCommands(self.id, commands)
222
223 self.rados_commands.run()
224
225 self.log.info("Request %s started" % (self.id,))
226 self.alive_at = now()
227
228 return self.id
229
230
231 class OsdMapModifyingRequest(RadosRequest):
232 """
233 Specialization of UserRequest which waits for Calamari's copy of
234 the OsdMap sync object to catch up after execution of RADOS commands.
235 """
236
237 def __init__(self, headline, commands):
238 commands = commands + [["osd stat", {"format": "json-pretty"}]]
239
240 super(OsdMapModifyingRequest, self).__init__(headline, commands)
241 self._await_version = None
242
243 # FIXME: would be nice to make all ceph command return epochs
244 # on completion, so we don't always do this to find out what
245 # epoch to wait for to see results of command
246 # FIXME: OR we could enforce that the C++ layer of ceph-mgr should
247 # always wait_for_latest before passing notifications to pythno land
248
249
250 @property
251 def status(self):
252 if self.state != self.COMPLETE and self._await_version:
253 return "Waiting for OSD map epoch %s" % self._await_version
254 else:
255 return super(OsdMapModifyingRequest, self).status
256
257 @property
258 def associations(self):
259 return {
260 }
261
262 @property
263 def awaiting_versions(self):
264 if self._await_version and self.state != self.COMPLETE:
265 return {
266 OsdMap: self._await_version
267 }
268 else:
269 return {}
270
271 def complete_jid(self):
272 # My remote work is done, record the version of the map that I will wait for
273 # and start waiting for it.
274 log.debug("decoding outb: '{0}'".format(self.rados_commands.outb))
275 self._await_version = json.loads(self.rados_commands.outb)['epoch']
276
277 def on_map(self, sync_type, osd_map):
278 assert sync_type == OsdMap
279 assert self._await_version is not None
280
281 ready = osd_map.version >= self._await_version
282 if ready:
283 self.log.debug("check passed (%s >= %s)" % (osd_map.version, self._await_version))
284 self.complete()
285 else:
286 self.log.debug("check pending (%s < %s)" % (osd_map.version, self._await_version))
287
288
289 class PoolCreatingRequest(OsdMapModifyingRequest):
290 """
291 Like an OsdMapModifyingRequest, but additionally wait for all PGs in the resulting pool
292 to leave state 'creating' before completing.
293 """
294
295 def __init__(self, headline, pool_name, commands):
296 super(PoolCreatingRequest, self).__init__(headline, commands)
297 self._awaiting_pgs = False
298 self._pool_name = pool_name
299
300 self._pool_id = None
301 self._pg_count = None
302
303 @property
304 def awaiting_versions(self):
305 if self._awaiting_pgs:
306 return {PgSummary: None}
307 elif self._await_version:
308 return {OsdMap: self._await_version}
309 else:
310 return {}
311
312 def on_map(self, sync_type, sync_object):
313 if self._awaiting_pgs:
314 assert sync_type == PgSummary
315 pg_summary = sync_object
316 pgs_not_creating = 0
317 for state_tuple, count in pg_summary.data['by_pool'][self._pool_id.__str__()].items():
318 states = state_tuple.split("+")
319 if 'creating' not in states:
320 pgs_not_creating += count
321
322 if pgs_not_creating >= self._pg_count:
323 self.complete()
324
325 elif self._await_version:
326 assert sync_type == OsdMap
327 osd_map = sync_object
328 if osd_map.version >= self._await_version:
329 for pool_id, pool in osd_map.pools_by_id.items():
330 if pool['pool_name'] == self._pool_name:
331 self._pool_id = pool_id
332 self._pg_count = pool['pg_num']
333 break
334
335 if self._pool_id is None:
336 log.error("'{0}' not found, pools are {1}".format(
337 self._pool_name, [p['pool_name'] for p in osd_map.pools_by_id.values()]
338 ))
339 self.set_error("Expected pool '{0}' not found".format(self._pool_name))
340 self.complete()
341
342 self._awaiting_pgs = True
343 else:
344 raise NotImplementedError("Unexpected map {0}".format(sync_type))
345
346
347 class PgProgress(object):
348 """
349 Encapsulate the state that PgCreatingRequest uses for splitting up
350 creation operations into blocks.
351 """
352 def __init__(self, initial, final, block_size):
353 self.initial = initial
354 self.final = final
355 self._block_size = block_size
356
357 self._still_to_create = self.final - self.initial
358
359 self._intermediate_goal = self.initial
360 if self._still_to_create > 0:
361 self.advance_goal()
362
363 def advance_goal(self):
364 assert not self.is_final_block()
365 self._intermediate_goal = min(self.final, self._intermediate_goal + self._block_size)
366
367 def set_created_pg_count(self, pg_count):
368 self._still_to_create = max(self.final - pg_count, 0)
369
370 def get_status(self):
371 total_creating = (self.final - self.initial)
372 created = total_creating - self._still_to_create
373
374 if self._intermediate_goal != self.final:
375 currently_creating_min = max(self._intermediate_goal - self._block_size, self.initial)
376 currently_creating_max = self._intermediate_goal
377 return "Waiting for PG creation (%s/%s), currently creating PGs %s-%s" % (
378 created, total_creating, currently_creating_min, currently_creating_max)
379 else:
380 return "Waiting for PG creation (%s/%s)" % (created, total_creating)
381
382 def expected_count(self):
383 """
384 After a successful 'osd pool set' operation, what should pg_num be?
385 """
386 return self._intermediate_goal
387
388 def is_final_block(self):
389 """
390 Is the current expansion under way the final one?
391 """
392 return self._intermediate_goal == self.final
393
394 def is_complete(self):
395 """
396 Have all expected PGs been created?
397 """
398 return self._still_to_create == 0
399
400 @property
401 def goal(self):
402 return self._intermediate_goal
403
404
405 class PgCreatingRequest(OsdMapModifyingRequest):
406 """
407 Specialization of OsdMapModifyingRequest to issue a request
408 to issue a second set of commands after PGs created by an
409 initial set of commands have left the 'creating' state.
410
411 This handles issuing multiple smaller "osd pool set pg_num" calls when
412 the number of new PGs requested is greater than mon_osd_max_split_count,
413 caller is responsible for telling us how many we may create at once.
414 """
415
416 # Simple state machine for phases:
417 # - have send a job, waiting for JID to complete
418 # - a jid completed, waiting for corresponding OSD map update
419 # - OSD map has updated, waiting for created PGs to leave state 'creating'
420 JID_WAIT = 'jid_wait'
421 OSD_MAP_WAIT = 'osd_map_wait'
422 PG_MAP_WAIT = 'pg_map_wait'
423
424 def __init__(self, headline, commands,
425 pool_id, pool_name, pgp_num,
426 initial_pg_count, final_pg_count, block_size):
427 """
428 :param commands: Commands to execute before creating PGs
429 :param initial_pg_count: How many PGs the pool has before we change anything
430 :param final_pg_count: How many PGs the pool should have when we are done
431 :param block_size: How many PGs we may create in one "osd pool set" command
432 """
433
434 self._await_osd_version = None
435
436 self._pool_id = pool_id
437 self._pool_name = pool_name
438 self._headline = headline
439
440 self._pg_progress = PgProgress(initial_pg_count, final_pg_count, block_size)
441 if initial_pg_count != final_pg_count:
442 commands.append(('osd pool set', {
443 'pool': self._pool_name,
444 'var': 'pg_num',
445 'val': self._pg_progress.goal
446 }))
447 self._post_create_commands = [("osd pool set", {'pool': pool_name, 'var': 'pgp_num', 'val': pgp_num})]
448
449 super(PgCreatingRequest, self).__init__(headline, commands)
450 self._phase = self.JID_WAIT
451
452 @property
453 def status(self):
454 if not self.state == self.COMPLETE and not self._pg_progress.is_complete():
455 return self._pg_progress.get_status()
456 else:
457 return super(PgCreatingRequest, self).status
458
459 def complete_jid(self):
460 self._await_version = json.loads(self.rados_commands.outb)['epoch']
461 self._phase = self.OSD_MAP_WAIT
462
463 @property
464 def awaiting_versions(self):
465 if self._phase == self.JID_WAIT:
466 return {}
467 elif self._phase == self.OSD_MAP_WAIT:
468 return {
469 OsdMap: self._await_version
470 }
471 elif self._phase == self.PG_MAP_WAIT:
472 return {
473 PgSummary: None,
474 OsdMap: None
475 }
476
477 def on_map(self, sync_type, sync_object):
478 self.log.debug("PgCreatingRequest %s %s" % (sync_type.str, self._phase))
479 if self._phase == self.PG_MAP_WAIT:
480 if sync_type == PgSummary:
481 # Count the PGs in this pool which are not in state 'creating'
482 pg_summary = sync_object
483 pgs_not_creating = 0
484
485 for state_tuple, count in pg_summary.data['by_pool'][self._pool_id.__str__()].items():
486 states = state_tuple.split("+")
487 if 'creating' not in states:
488 pgs_not_creating += count
489
490 self._pg_progress.set_created_pg_count(pgs_not_creating)
491 self.log.debug("PgCreatingRequest.on_map: pg_counter=%s/%s (final %s)" % (
492 pgs_not_creating, self._pg_progress.goal, self._pg_progress.final))
493 if pgs_not_creating >= self._pg_progress.goal:
494 if self._pg_progress.is_final_block():
495 self.log.debug("PgCreatingRequest.on_map Creations complete")
496 if self._post_create_commands:
497 self.log.debug("PgCreatingRequest.on_map Issuing post-create commands")
498 self._submit(self._post_create_commands)
499 self._phase = self.JID_WAIT
500 else:
501 self.log.debug("PgCreatingRequest.on_map All done")
502 self.complete()
503 else:
504 self.log.debug("PgCreatingREQUEST.on_map Issuing more creates")
505 self._pg_progress.advance_goal()
506 # Request another tranche of PGs up to _block_size
507 self._submit([('osd pool set', {
508 'pool': self._pool_name,
509 'var': 'pg_num',
510 'val': self._pg_progress.goal
511 })])
512 self._phase = self.JID_WAIT
513 elif sync_type == OsdMap:
514 # Keep an eye on the OsdMap to check that pg_num is what we expect: otherwise
515 # if forces of darkness changed pg_num then our PG creation check could
516 # get confused and fail to complete.
517 osd_map = sync_object
518 pool = osd_map.pools_by_id[self._pool_id]
519 if pool['pg_num'] != self._pg_progress.expected_count():
520 self.set_error("PG creation interrupted (unexpected change to pg_num)")
521 self.complete()
522 return
523 else:
524 raise NotImplementedError("Unexpected map {1} in state {2}".format(
525 sync_type, self._phase
526 ))
527
528 elif self._phase == self.OSD_MAP_WAIT:
529 # Read back the pg_num for my pool from the OSD map
530 osd_map = sync_object
531 pool = osd_map.pools_by_id[self._pool_id]
532
533 # In Ceph <= 0.67.7, "osd pool set pg_num" will return success even if it hasn't
534 # really increased pg_num, so we must examine the OSD map to see if it really succeded
535 if pool['pg_num'] != self._pg_progress.expected_count():
536 self.set_error("PG creation failed (check that there aren't already PGs in 'creating' state)")
537 self.complete()
538 return
539
540 assert self._await_version
541 ready = osd_map.version >= self._await_version
542 if ready:
543 # OSD map advancement either means a PG creation round completed, or that
544 # the post_create_commands completed. Distinguish by looking at pg_progress.
545 if self._pg_progress.is_complete():
546 # This was the OSD map update from the post_create_commands, we we're all done!
547 self.complete()
548 else:
549 # This was the OSD map update from a PG creation command, so start waiting
550 # for the pgs
551 self._phase = self.PG_MAP_WAIT
552 else:
553 raise NotImplementedError("Unexpected {0} in phase {1}".format(sync_type, self._phase))