]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rest/app/manager/user_request.py
5 from rest
.app
.types
import OsdMap
, PgSummary
, USER_REQUEST_COMPLETE
, USER_REQUEST_SUBMITTED
6 from rest
.app
.util
import now
7 from mgr_module
import CommandResult
9 from rest
.logger
import logger
11 from rest
.module
import global_instance
as rest_plugin
14 class UserRequestBase(object):
16 A request acts on one or more Ceph-managed objects, i.e.
19 Amist the terminology mess of 'jobs', 'commands', 'operations', this class
20 is named for clarity: it's an operation at an end-user level of
21 granularity, something that might be a button in the UI.
23 UserRequests are usually remotely executed on a mon. However, there
24 may be a final step of updating the state of ClusterMonitor in order
25 that subsequent REST API consumer reads return values consistent with
26 the job having completed, e.g. waiting for the OSD map to be up
27 to date before calling a pool creation complete. For this reason,
28 UserRequests have a local ID and completion state that is independent
29 of their remote ID (salt jid). UserRequests may also execute more than
30 one JID in the course of their lifetime.
32 Requests have the following lifecycle:
33 NEW object is created, it has all the information needed to do its job
34 other than where it should execute.
35 SUBMITTED the request has started executing, usually this will have involved sending
36 out a salt job, so .jid is often set but not always.
37 COMPLETE no further action, this instance will remain constant from this point on.
38 this does not indicate anything about success or failure.
42 SUBMITTED
= USER_REQUEST_SUBMITTED
43 COMPLETE
= USER_REQUEST_COMPLETE
44 states
= [NEW
, SUBMITTED
, COMPLETE
]
48 Requiring cluster_name and fsid is redundant (ideally everything would
49 speak in terms of fsid) but convenient, because the librados interface
50 wants a cluster name when you create a client, and otherwise we would
51 have to look up via ceph.conf.
53 # getChild isn't in 2.6
54 logname
= '.'.join((log
.name
, self
.__class
__.__name
__))
55 self
.log
= logging
.getLogger(logname
)
56 self
.requested_at
= now()
57 self
.completed_at
= None
59 # This is actually kind of overkill compared with having a counter,
60 # somewhere but it's easy.
61 self
.id = uuid
.uuid4().__str
__()
66 self
.error_message
= ""
68 # Time at which we last believed the current JID to be really running
71 def set_error(self
, message
):
73 self
.error_message
= message
76 def associations(self
):
78 A dictionary of Event-compatible assocations for this request, indicating
79 which cluster/server/services we are affecting.
86 Single line describing what the request is trying to accomplish.
88 raise NotImplementedError()
93 Single line describing which phase of the request is currently happening, useful
94 to distinguish what's going on for long running operations. For simple quick
95 operations no need to return anything here as the headline tells all.
97 if self
.state
!= self
.COMPLETE
:
100 return "Failed (%s)" % self
.error_message
102 return "Completed successfully"
105 def awaiting_versions(self
):
107 Requests indicate that they are waiting for particular sync objects, optionally
108 specifying the particular version they are waiting for (otherwise set version
111 :return dict of SyncObject subclass to (version or None)
117 Start remote execution phase by publishing a job to salt.
119 assert self
.state
== self
.NEW
123 self
.state
= self
.SUBMITTED
126 raise NotImplementedError()
128 def complete_jid(self
):
130 Call this when remote execution is done.
132 Implementations must always update .jid appropriately here: either to the
133 jid of a new job, or to None.
136 # This is a default behaviour for UserRequests which don't override this method:
137 # assume completion of a JID means the job is now done.
142 Call this when you're all done
144 assert self
.state
!= self
.COMPLETE
146 self
.log
.info("Request %s completed with error=%s (%s)" % (self
.id, self
.error
, self
.error_message
))
147 self
.state
= self
.COMPLETE
148 self
.completed_at
= now()
150 def on_map(self
, sync_type
, sync_object
):
152 It is only valid to call this for sync_types which are currently in awaiting_versions
157 class UserRequest(UserRequestBase
):
158 def __init__(self
, headline
):
159 super(UserRequest
, self
).__init
__()
160 self
._await
_version
= None
161 self
._headline
= headline
165 return self
._headline
168 class RadosCommands(object):
169 def __init__(self
, tag
, commands
):
172 self
._commands
= commands
179 cmd
= self
._commands
[0]
180 self
._commands
= self
._commands
[1:]
181 self
.result
= CommandResult(self
._tag
)
183 log
.debug("cmd={0}".format(cmd
))
185 # Commands come in as 2-tuple of args and prefix, convert them
186 # to the form that send_command uses
188 command
['prefix'] = cmd
[0]
190 rest_plugin().send_command(self
.result
, json
.dumps(command
), self
._tag
)
192 def is_complete(self
):
193 return self
.result
is None and not self
._commands
196 self
.r
, self
.outb
, self
.outs
= self
.result
.wait()
207 class RadosRequest(UserRequest
):
209 A user request whose remote operations consist of librados mon commands
211 def __init__(self
, headline
, commands
):
212 super(RadosRequest
, self
).__init
__(headline
)
213 self
.rados_commands
= RadosCommands(self
.id, commands
)
214 self
._commands
= commands
216 def _submit(self
, commands
=None):
218 commands
= self
._commands
220 commands
= commands
+ [["osd stat", {"format": "json-pretty"}]]
221 self
.rados_commands
= RadosCommands(self
.id, commands
)
223 self
.rados_commands
.run()
225 self
.log
.info("Request %s started" % (self
.id,))
226 self
.alive_at
= now()
231 class OsdMapModifyingRequest(RadosRequest
):
233 Specialization of UserRequest which waits for Calamari's copy of
234 the OsdMap sync object to catch up after execution of RADOS commands.
237 def __init__(self
, headline
, commands
):
238 commands
= commands
+ [["osd stat", {"format": "json-pretty"}]]
240 super(OsdMapModifyingRequest
, self
).__init
__(headline
, commands
)
241 self
._await
_version
= None
243 # FIXME: would be nice to make all ceph command return epochs
244 # on completion, so we don't always do this to find out what
245 # epoch to wait for to see results of command
246 # FIXME: OR we could enforce that the C++ layer of ceph-mgr should
247 # always wait_for_latest before passing notifications to pythno land
252 if self
.state
!= self
.COMPLETE
and self
._await
_version
:
253 return "Waiting for OSD map epoch %s" % self
._await
_version
255 return super(OsdMapModifyingRequest
, self
).status
258 def associations(self
):
263 def awaiting_versions(self
):
264 if self
._await
_version
and self
.state
!= self
.COMPLETE
:
266 OsdMap
: self
._await
_version
271 def complete_jid(self
):
272 # My remote work is done, record the version of the map that I will wait for
273 # and start waiting for it.
274 log
.debug("decoding outb: '{0}'".format(self
.rados_commands
.outb
))
275 self
._await
_version
= json
.loads(self
.rados_commands
.outb
)['epoch']
277 def on_map(self
, sync_type
, osd_map
):
278 assert sync_type
== OsdMap
279 assert self
._await
_version
is not None
281 ready
= osd_map
.version
>= self
._await
_version
283 self
.log
.debug("check passed (%s >= %s)" % (osd_map
.version
, self
._await
_version
))
286 self
.log
.debug("check pending (%s < %s)" % (osd_map
.version
, self
._await
_version
))
289 class PoolCreatingRequest(OsdMapModifyingRequest
):
291 Like an OsdMapModifyingRequest, but additionally wait for all PGs in the resulting pool
292 to leave state 'creating' before completing.
295 def __init__(self
, headline
, pool_name
, commands
):
296 super(PoolCreatingRequest
, self
).__init
__(headline
, commands
)
297 self
._awaiting
_pgs
= False
298 self
._pool
_name
= pool_name
301 self
._pg
_count
= None
304 def awaiting_versions(self
):
305 if self
._awaiting
_pgs
:
306 return {PgSummary
: None}
307 elif self
._await
_version
:
308 return {OsdMap
: self
._await
_version
}
312 def on_map(self
, sync_type
, sync_object
):
313 if self
._awaiting
_pgs
:
314 assert sync_type
== PgSummary
315 pg_summary
= sync_object
317 for state_tuple
, count
in pg_summary
.data
['by_pool'][self
._pool
_id
.__str
__()].items():
318 states
= state_tuple
.split("+")
319 if 'creating' not in states
:
320 pgs_not_creating
+= count
322 if pgs_not_creating
>= self
._pg
_count
:
325 elif self
._await
_version
:
326 assert sync_type
== OsdMap
327 osd_map
= sync_object
328 if osd_map
.version
>= self
._await
_version
:
329 for pool_id
, pool
in osd_map
.pools_by_id
.items():
330 if pool
['pool_name'] == self
._pool
_name
:
331 self
._pool
_id
= pool_id
332 self
._pg
_count
= pool
['pg_num']
335 if self
._pool
_id
is None:
336 log
.error("'{0}' not found, pools are {1}".format(
337 self
._pool
_name
, [p
['pool_name'] for p
in osd_map
.pools_by_id
.values()]
339 self
.set_error("Expected pool '{0}' not found".format(self
._pool
_name
))
342 self
._awaiting
_pgs
= True
344 raise NotImplementedError("Unexpected map {0}".format(sync_type
))
347 class PgProgress(object):
349 Encapsulate the state that PgCreatingRequest uses for splitting up
350 creation operations into blocks.
352 def __init__(self
, initial
, final
, block_size
):
353 self
.initial
= initial
355 self
._block
_size
= block_size
357 self
._still
_to
_create
= self
.final
- self
.initial
359 self
._intermediate
_goal
= self
.initial
360 if self
._still
_to
_create
> 0:
363 def advance_goal(self
):
364 assert not self
.is_final_block()
365 self
._intermediate
_goal
= min(self
.final
, self
._intermediate
_goal
+ self
._block
_size
)
367 def set_created_pg_count(self
, pg_count
):
368 self
._still
_to
_create
= max(self
.final
- pg_count
, 0)
370 def get_status(self
):
371 total_creating
= (self
.final
- self
.initial
)
372 created
= total_creating
- self
._still
_to
_create
374 if self
._intermediate
_goal
!= self
.final
:
375 currently_creating_min
= max(self
._intermediate
_goal
- self
._block
_size
, self
.initial
)
376 currently_creating_max
= self
._intermediate
_goal
377 return "Waiting for PG creation (%s/%s), currently creating PGs %s-%s" % (
378 created
, total_creating
, currently_creating_min
, currently_creating_max
)
380 return "Waiting for PG creation (%s/%s)" % (created
, total_creating
)
382 def expected_count(self
):
384 After a successful 'osd pool set' operation, what should pg_num be?
386 return self
._intermediate
_goal
388 def is_final_block(self
):
390 Is the current expansion under way the final one?
392 return self
._intermediate
_goal
== self
.final
394 def is_complete(self
):
396 Have all expected PGs been created?
398 return self
._still
_to
_create
== 0
402 return self
._intermediate
_goal
405 class PgCreatingRequest(OsdMapModifyingRequest
):
407 Specialization of OsdMapModifyingRequest to issue a request
408 to issue a second set of commands after PGs created by an
409 initial set of commands have left the 'creating' state.
411 This handles issuing multiple smaller "osd pool set pg_num" calls when
412 the number of new PGs requested is greater than mon_osd_max_split_count,
413 caller is responsible for telling us how many we may create at once.
416 # Simple state machine for phases:
417 # - have send a job, waiting for JID to complete
418 # - a jid completed, waiting for corresponding OSD map update
419 # - OSD map has updated, waiting for created PGs to leave state 'creating'
420 JID_WAIT
= 'jid_wait'
421 OSD_MAP_WAIT
= 'osd_map_wait'
422 PG_MAP_WAIT
= 'pg_map_wait'
424 def __init__(self
, headline
, commands
,
425 pool_id
, pool_name
, pgp_num
,
426 initial_pg_count
, final_pg_count
, block_size
):
428 :param commands: Commands to execute before creating PGs
429 :param initial_pg_count: How many PGs the pool has before we change anything
430 :param final_pg_count: How many PGs the pool should have when we are done
431 :param block_size: How many PGs we may create in one "osd pool set" command
434 self
._await
_osd
_version
= None
436 self
._pool
_id
= pool_id
437 self
._pool
_name
= pool_name
438 self
._headline
= headline
440 self
._pg
_progress
= PgProgress(initial_pg_count
, final_pg_count
, block_size
)
441 if initial_pg_count
!= final_pg_count
:
442 commands
.append(('osd pool set', {
443 'pool': self
._pool
_name
,
445 'val': self
._pg
_progress
.goal
447 self
._post
_create
_commands
= [("osd pool set", {'pool': pool_name
, 'var': 'pgp_num', 'val': pgp_num
})]
449 super(PgCreatingRequest
, self
).__init
__(headline
, commands
)
450 self
._phase
= self
.JID_WAIT
454 if not self
.state
== self
.COMPLETE
and not self
._pg
_progress
.is_complete():
455 return self
._pg
_progress
.get_status()
457 return super(PgCreatingRequest
, self
).status
459 def complete_jid(self
):
460 self
._await
_version
= json
.loads(self
.rados_commands
.outb
)['epoch']
461 self
._phase
= self
.OSD_MAP_WAIT
464 def awaiting_versions(self
):
465 if self
._phase
== self
.JID_WAIT
:
467 elif self
._phase
== self
.OSD_MAP_WAIT
:
469 OsdMap
: self
._await
_version
471 elif self
._phase
== self
.PG_MAP_WAIT
:
477 def on_map(self
, sync_type
, sync_object
):
478 self
.log
.debug("PgCreatingRequest %s %s" % (sync_type
.str, self
._phase
))
479 if self
._phase
== self
.PG_MAP_WAIT
:
480 if sync_type
== PgSummary
:
481 # Count the PGs in this pool which are not in state 'creating'
482 pg_summary
= sync_object
485 for state_tuple
, count
in pg_summary
.data
['by_pool'][self
._pool
_id
.__str
__()].items():
486 states
= state_tuple
.split("+")
487 if 'creating' not in states
:
488 pgs_not_creating
+= count
490 self
._pg
_progress
.set_created_pg_count(pgs_not_creating
)
491 self
.log
.debug("PgCreatingRequest.on_map: pg_counter=%s/%s (final %s)" % (
492 pgs_not_creating
, self
._pg
_progress
.goal
, self
._pg
_progress
.final
))
493 if pgs_not_creating
>= self
._pg
_progress
.goal
:
494 if self
._pg
_progress
.is_final_block():
495 self
.log
.debug("PgCreatingRequest.on_map Creations complete")
496 if self
._post
_create
_commands
:
497 self
.log
.debug("PgCreatingRequest.on_map Issuing post-create commands")
498 self
._submit
(self
._post
_create
_commands
)
499 self
._phase
= self
.JID_WAIT
501 self
.log
.debug("PgCreatingRequest.on_map All done")
504 self
.log
.debug("PgCreatingREQUEST.on_map Issuing more creates")
505 self
._pg
_progress
.advance_goal()
506 # Request another tranche of PGs up to _block_size
507 self
._submit
([('osd pool set', {
508 'pool': self
._pool
_name
,
510 'val': self
._pg
_progress
.goal
512 self
._phase
= self
.JID_WAIT
513 elif sync_type
== OsdMap
:
514 # Keep an eye on the OsdMap to check that pg_num is what we expect: otherwise
515 # if forces of darkness changed pg_num then our PG creation check could
516 # get confused and fail to complete.
517 osd_map
= sync_object
518 pool
= osd_map
.pools_by_id
[self
._pool
_id
]
519 if pool
['pg_num'] != self
._pg
_progress
.expected_count():
520 self
.set_error("PG creation interrupted (unexpected change to pg_num)")
524 raise NotImplementedError("Unexpected map {1} in state {2}".format(
525 sync_type
, self
._phase
528 elif self
._phase
== self
.OSD_MAP_WAIT
:
529 # Read back the pg_num for my pool from the OSD map
530 osd_map
= sync_object
531 pool
= osd_map
.pools_by_id
[self
._pool
_id
]
533 # In Ceph <= 0.67.7, "osd pool set pg_num" will return success even if it hasn't
534 # really increased pg_num, so we must examine the OSD map to see if it really succeded
535 if pool
['pg_num'] != self
._pg
_progress
.expected_count():
536 self
.set_error("PG creation failed (check that there aren't already PGs in 'creating' state)")
540 assert self
._await
_version
541 ready
= osd_map
.version
>= self
._await
_version
543 # OSD map advancement either means a PG creation round completed, or that
544 # the post_create_commands completed. Distinguish by looking at pg_progress.
545 if self
._pg
_progress
.is_complete():
546 # This was the OSD map update from the post_create_commands, we we're all done!
549 # This was the OSD map update from a PG creation command, so start waiting
551 self
._phase
= self
.PG_MAP_WAIT
553 raise NotImplementedError("Unexpected {0} in phase {1}".format(sync_type
, self
._phase
))