]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import logging | |
3 | import uuid | |
4 | ||
5 | from rest.app.types import OsdMap, PgSummary, USER_REQUEST_COMPLETE, USER_REQUEST_SUBMITTED | |
6 | from rest.app.util import now | |
7 | from mgr_module import CommandResult | |
8 | ||
9 | from rest.logger import logger | |
10 | log = logger() | |
11 | from rest.module import global_instance as rest_plugin | |
12 | ||
13 | ||
14 | class UserRequestBase(object): | |
15 | """ | |
16 | A request acts on one or more Ceph-managed objects, i.e. | |
17 | mon, mds, osd, pg. | |
18 | ||
19 | Amist the terminology mess of 'jobs', 'commands', 'operations', this class | |
20 | is named for clarity: it's an operation at an end-user level of | |
21 | granularity, something that might be a button in the UI. | |
22 | ||
23 | UserRequests are usually remotely executed on a mon. However, there | |
24 | may be a final step of updating the state of ClusterMonitor in order | |
25 | that subsequent REST API consumer reads return values consistent with | |
26 | the job having completed, e.g. waiting for the OSD map to be up | |
27 | to date before calling a pool creation complete. For this reason, | |
28 | UserRequests have a local ID and completion state that is independent | |
29 | of their remote ID (salt jid). UserRequests may also execute more than | |
30 | one JID in the course of their lifetime. | |
31 | ||
32 | Requests have the following lifecycle: | |
33 | NEW object is created, it has all the information needed to do its job | |
34 | other than where it should execute. | |
35 | SUBMITTED the request has started executing, usually this will have involved sending | |
36 | out a salt job, so .jid is often set but not always. | |
37 | COMPLETE no further action, this instance will remain constant from this point on. | |
38 | this does not indicate anything about success or failure. | |
39 | """ | |
40 | ||
41 | NEW = 'new' | |
42 | SUBMITTED = USER_REQUEST_SUBMITTED | |
43 | COMPLETE = USER_REQUEST_COMPLETE | |
44 | states = [NEW, SUBMITTED, COMPLETE] | |
45 | ||
46 | def __init__(self): | |
47 | """ | |
48 | Requiring cluster_name and fsid is redundant (ideally everything would | |
49 | speak in terms of fsid) but convenient, because the librados interface | |
50 | wants a cluster name when you create a client, and otherwise we would | |
51 | have to look up via ceph.conf. | |
52 | """ | |
53 | # getChild isn't in 2.6 | |
54 | logname = '.'.join((log.name, self.__class__.__name__)) | |
55 | self.log = logging.getLogger(logname) | |
56 | self.requested_at = now() | |
57 | self.completed_at = None | |
58 | ||
59 | # This is actually kind of overkill compared with having a counter, | |
60 | # somewhere but it's easy. | |
61 | self.id = uuid.uuid4().__str__() | |
62 | ||
63 | self.state = self.NEW | |
64 | self.result = None | |
65 | self.error = False | |
66 | self.error_message = "" | |
67 | ||
68 | # Time at which we last believed the current JID to be really running | |
69 | self.alive_at = None | |
70 | ||
71 | def set_error(self, message): | |
72 | self.error = True | |
73 | self.error_message = message | |
74 | ||
75 | @property | |
76 | def associations(self): | |
77 | """ | |
78 | A dictionary of Event-compatible assocations for this request, indicating | |
79 | which cluster/server/services we are affecting. | |
80 | """ | |
81 | return {} | |
82 | ||
83 | @property | |
84 | def headline(self): | |
85 | """ | |
86 | Single line describing what the request is trying to accomplish. | |
87 | """ | |
88 | raise NotImplementedError() | |
89 | ||
90 | @property | |
91 | def status(self): | |
92 | """ | |
93 | Single line describing which phase of the request is currently happening, useful | |
94 | to distinguish what's going on for long running operations. For simple quick | |
95 | operations no need to return anything here as the headline tells all. | |
96 | """ | |
97 | if self.state != self.COMPLETE: | |
98 | return "Running" | |
99 | elif self.error: | |
100 | return "Failed (%s)" % self.error_message | |
101 | else: | |
102 | return "Completed successfully" | |
103 | ||
104 | @property | |
105 | def awaiting_versions(self): | |
106 | """ | |
107 | Requests indicate that they are waiting for particular sync objects, optionally | |
108 | specifying the particular version they are waiting for (otherwise set version | |
109 | to None). | |
110 | ||
111 | :return dict of SyncObject subclass to (version or None) | |
112 | """ | |
113 | return {} | |
114 | ||
115 | def submit(self): | |
116 | """ | |
117 | Start remote execution phase by publishing a job to salt. | |
118 | """ | |
119 | assert self.state == self.NEW | |
120 | ||
121 | self._submit() | |
122 | ||
123 | self.state = self.SUBMITTED | |
124 | ||
125 | def _submit(self): | |
126 | raise NotImplementedError() | |
127 | ||
128 | def complete_jid(self): | |
129 | """ | |
130 | Call this when remote execution is done. | |
131 | ||
132 | Implementations must always update .jid appropriately here: either to the | |
133 | jid of a new job, or to None. | |
134 | """ | |
135 | ||
136 | # This is a default behaviour for UserRequests which don't override this method: | |
137 | # assume completion of a JID means the job is now done. | |
138 | self.complete() | |
139 | ||
140 | def complete(self): | |
141 | """ | |
142 | Call this when you're all done | |
143 | """ | |
144 | assert self.state != self.COMPLETE | |
145 | ||
146 | self.log.info("Request %s completed with error=%s (%s)" % (self.id, self.error, self.error_message)) | |
147 | self.state = self.COMPLETE | |
148 | self.completed_at = now() | |
149 | ||
150 | def on_map(self, sync_type, sync_object): | |
151 | """ | |
152 | It is only valid to call this for sync_types which are currently in awaiting_versions | |
153 | """ | |
154 | pass | |
155 | ||
156 | ||
157 | class UserRequest(UserRequestBase): | |
158 | def __init__(self, headline): | |
159 | super(UserRequest, self).__init__() | |
160 | self._await_version = None | |
161 | self._headline = headline | |
162 | ||
163 | @property | |
164 | def headline(self): | |
165 | return self._headline | |
166 | ||
167 | ||
168 | class RadosCommands(object): | |
169 | def __init__(self, tag, commands): | |
170 | self.result = None | |
171 | self._tag = tag | |
172 | self._commands = commands | |
173 | ||
174 | self.r = None | |
175 | self.outs = None | |
176 | self.outb = None | |
177 | ||
178 | def run(self): | |
179 | cmd = self._commands[0] | |
180 | self._commands = self._commands[1:] | |
181 | self.result = CommandResult(self._tag) | |
182 | ||
183 | log.debug("cmd={0}".format(cmd)) | |
184 | ||
185 | # Commands come in as 2-tuple of args and prefix, convert them | |
186 | # to the form that send_command uses | |
187 | command = cmd[1] | |
188 | command['prefix'] = cmd[0] | |
189 | ||
190 | rest_plugin().send_command(self.result, json.dumps(command), self._tag) | |
191 | ||
192 | def is_complete(self): | |
193 | return self.result is None and not self._commands | |
194 | ||
195 | def advance(self): | |
196 | self.r, self.outb, self.outs = self.result.wait() | |
197 | self.result = None | |
198 | ||
199 | if self.r == 0: | |
200 | if self._commands: | |
201 | self.run() | |
202 | else: | |
203 | # Stop on errors | |
204 | self._commands = [] | |
205 | ||
206 | ||
207 | class RadosRequest(UserRequest): | |
208 | """ | |
209 | A user request whose remote operations consist of librados mon commands | |
210 | """ | |
211 | def __init__(self, headline, commands): | |
212 | super(RadosRequest, self).__init__(headline) | |
213 | self.rados_commands = RadosCommands(self.id, commands) | |
214 | self._commands = commands | |
215 | ||
216 | def _submit(self, commands=None): | |
217 | if commands is None: | |
218 | commands = self._commands | |
219 | else: | |
220 | commands = commands + [["osd stat", {"format": "json-pretty"}]] | |
221 | self.rados_commands = RadosCommands(self.id, commands) | |
222 | ||
223 | self.rados_commands.run() | |
224 | ||
225 | self.log.info("Request %s started" % (self.id,)) | |
226 | self.alive_at = now() | |
227 | ||
228 | return self.id | |
229 | ||
230 | ||
231 | class OsdMapModifyingRequest(RadosRequest): | |
232 | """ | |
233 | Specialization of UserRequest which waits for Calamari's copy of | |
234 | the OsdMap sync object to catch up after execution of RADOS commands. | |
235 | """ | |
236 | ||
237 | def __init__(self, headline, commands): | |
238 | commands = commands + [["osd stat", {"format": "json-pretty"}]] | |
239 | ||
240 | super(OsdMapModifyingRequest, self).__init__(headline, commands) | |
241 | self._await_version = None | |
242 | ||
243 | # FIXME: would be nice to make all ceph command return epochs | |
244 | # on completion, so we don't always do this to find out what | |
245 | # epoch to wait for to see results of command | |
246 | # FIXME: OR we could enforce that the C++ layer of ceph-mgr should | |
247 | # always wait_for_latest before passing notifications to pythno land | |
248 | ||
249 | ||
250 | @property | |
251 | def status(self): | |
252 | if self.state != self.COMPLETE and self._await_version: | |
253 | return "Waiting for OSD map epoch %s" % self._await_version | |
254 | else: | |
255 | return super(OsdMapModifyingRequest, self).status | |
256 | ||
257 | @property | |
258 | def associations(self): | |
259 | return { | |
260 | } | |
261 | ||
262 | @property | |
263 | def awaiting_versions(self): | |
264 | if self._await_version and self.state != self.COMPLETE: | |
265 | return { | |
266 | OsdMap: self._await_version | |
267 | } | |
268 | else: | |
269 | return {} | |
270 | ||
271 | def complete_jid(self): | |
272 | # My remote work is done, record the version of the map that I will wait for | |
273 | # and start waiting for it. | |
274 | log.debug("decoding outb: '{0}'".format(self.rados_commands.outb)) | |
275 | self._await_version = json.loads(self.rados_commands.outb)['epoch'] | |
276 | ||
277 | def on_map(self, sync_type, osd_map): | |
278 | assert sync_type == OsdMap | |
279 | assert self._await_version is not None | |
280 | ||
281 | ready = osd_map.version >= self._await_version | |
282 | if ready: | |
283 | self.log.debug("check passed (%s >= %s)" % (osd_map.version, self._await_version)) | |
284 | self.complete() | |
285 | else: | |
286 | self.log.debug("check pending (%s < %s)" % (osd_map.version, self._await_version)) | |
287 | ||
288 | ||
289 | class PoolCreatingRequest(OsdMapModifyingRequest): | |
290 | """ | |
291 | Like an OsdMapModifyingRequest, but additionally wait for all PGs in the resulting pool | |
292 | to leave state 'creating' before completing. | |
293 | """ | |
294 | ||
295 | def __init__(self, headline, pool_name, commands): | |
296 | super(PoolCreatingRequest, self).__init__(headline, commands) | |
297 | self._awaiting_pgs = False | |
298 | self._pool_name = pool_name | |
299 | ||
300 | self._pool_id = None | |
301 | self._pg_count = None | |
302 | ||
303 | @property | |
304 | def awaiting_versions(self): | |
305 | if self._awaiting_pgs: | |
306 | return {PgSummary: None} | |
307 | elif self._await_version: | |
308 | return {OsdMap: self._await_version} | |
309 | else: | |
310 | return {} | |
311 | ||
312 | def on_map(self, sync_type, sync_object): | |
313 | if self._awaiting_pgs: | |
314 | assert sync_type == PgSummary | |
315 | pg_summary = sync_object | |
316 | pgs_not_creating = 0 | |
317 | for state_tuple, count in pg_summary.data['by_pool'][self._pool_id.__str__()].items(): | |
318 | states = state_tuple.split("+") | |
319 | if 'creating' not in states: | |
320 | pgs_not_creating += count | |
321 | ||
322 | if pgs_not_creating >= self._pg_count: | |
323 | self.complete() | |
324 | ||
325 | elif self._await_version: | |
326 | assert sync_type == OsdMap | |
327 | osd_map = sync_object | |
328 | if osd_map.version >= self._await_version: | |
329 | for pool_id, pool in osd_map.pools_by_id.items(): | |
330 | if pool['pool_name'] == self._pool_name: | |
331 | self._pool_id = pool_id | |
332 | self._pg_count = pool['pg_num'] | |
333 | break | |
334 | ||
335 | if self._pool_id is None: | |
336 | log.error("'{0}' not found, pools are {1}".format( | |
337 | self._pool_name, [p['pool_name'] for p in osd_map.pools_by_id.values()] | |
338 | )) | |
339 | self.set_error("Expected pool '{0}' not found".format(self._pool_name)) | |
340 | self.complete() | |
341 | ||
342 | self._awaiting_pgs = True | |
343 | else: | |
344 | raise NotImplementedError("Unexpected map {0}".format(sync_type)) | |
345 | ||
346 | ||
347 | class PgProgress(object): | |
348 | """ | |
349 | Encapsulate the state that PgCreatingRequest uses for splitting up | |
350 | creation operations into blocks. | |
351 | """ | |
352 | def __init__(self, initial, final, block_size): | |
353 | self.initial = initial | |
354 | self.final = final | |
355 | self._block_size = block_size | |
356 | ||
357 | self._still_to_create = self.final - self.initial | |
358 | ||
359 | self._intermediate_goal = self.initial | |
360 | if self._still_to_create > 0: | |
361 | self.advance_goal() | |
362 | ||
363 | def advance_goal(self): | |
364 | assert not self.is_final_block() | |
365 | self._intermediate_goal = min(self.final, self._intermediate_goal + self._block_size) | |
366 | ||
367 | def set_created_pg_count(self, pg_count): | |
368 | self._still_to_create = max(self.final - pg_count, 0) | |
369 | ||
370 | def get_status(self): | |
371 | total_creating = (self.final - self.initial) | |
372 | created = total_creating - self._still_to_create | |
373 | ||
374 | if self._intermediate_goal != self.final: | |
375 | currently_creating_min = max(self._intermediate_goal - self._block_size, self.initial) | |
376 | currently_creating_max = self._intermediate_goal | |
377 | return "Waiting for PG creation (%s/%s), currently creating PGs %s-%s" % ( | |
378 | created, total_creating, currently_creating_min, currently_creating_max) | |
379 | else: | |
380 | return "Waiting for PG creation (%s/%s)" % (created, total_creating) | |
381 | ||
382 | def expected_count(self): | |
383 | """ | |
384 | After a successful 'osd pool set' operation, what should pg_num be? | |
385 | """ | |
386 | return self._intermediate_goal | |
387 | ||
388 | def is_final_block(self): | |
389 | """ | |
390 | Is the current expansion under way the final one? | |
391 | """ | |
392 | return self._intermediate_goal == self.final | |
393 | ||
394 | def is_complete(self): | |
395 | """ | |
396 | Have all expected PGs been created? | |
397 | """ | |
398 | return self._still_to_create == 0 | |
399 | ||
400 | @property | |
401 | def goal(self): | |
402 | return self._intermediate_goal | |
403 | ||
404 | ||
405 | class PgCreatingRequest(OsdMapModifyingRequest): | |
406 | """ | |
407 | Specialization of OsdMapModifyingRequest to issue a request | |
408 | to issue a second set of commands after PGs created by an | |
409 | initial set of commands have left the 'creating' state. | |
410 | ||
411 | This handles issuing multiple smaller "osd pool set pg_num" calls when | |
412 | the number of new PGs requested is greater than mon_osd_max_split_count, | |
413 | caller is responsible for telling us how many we may create at once. | |
414 | """ | |
415 | ||
416 | # Simple state machine for phases: | |
417 | # - have send a job, waiting for JID to complete | |
418 | # - a jid completed, waiting for corresponding OSD map update | |
419 | # - OSD map has updated, waiting for created PGs to leave state 'creating' | |
420 | JID_WAIT = 'jid_wait' | |
421 | OSD_MAP_WAIT = 'osd_map_wait' | |
422 | PG_MAP_WAIT = 'pg_map_wait' | |
423 | ||
424 | def __init__(self, headline, commands, | |
425 | pool_id, pool_name, pgp_num, | |
426 | initial_pg_count, final_pg_count, block_size): | |
427 | """ | |
428 | :param commands: Commands to execute before creating PGs | |
429 | :param initial_pg_count: How many PGs the pool has before we change anything | |
430 | :param final_pg_count: How many PGs the pool should have when we are done | |
431 | :param block_size: How many PGs we may create in one "osd pool set" command | |
432 | """ | |
433 | ||
434 | self._await_osd_version = None | |
435 | ||
436 | self._pool_id = pool_id | |
437 | self._pool_name = pool_name | |
438 | self._headline = headline | |
439 | ||
440 | self._pg_progress = PgProgress(initial_pg_count, final_pg_count, block_size) | |
441 | if initial_pg_count != final_pg_count: | |
442 | commands.append(('osd pool set', { | |
443 | 'pool': self._pool_name, | |
444 | 'var': 'pg_num', | |
445 | 'val': self._pg_progress.goal | |
446 | })) | |
447 | self._post_create_commands = [("osd pool set", {'pool': pool_name, 'var': 'pgp_num', 'val': pgp_num})] | |
448 | ||
449 | super(PgCreatingRequest, self).__init__(headline, commands) | |
450 | self._phase = self.JID_WAIT | |
451 | ||
452 | @property | |
453 | def status(self): | |
454 | if not self.state == self.COMPLETE and not self._pg_progress.is_complete(): | |
455 | return self._pg_progress.get_status() | |
456 | else: | |
457 | return super(PgCreatingRequest, self).status | |
458 | ||
459 | def complete_jid(self): | |
460 | self._await_version = json.loads(self.rados_commands.outb)['epoch'] | |
461 | self._phase = self.OSD_MAP_WAIT | |
462 | ||
463 | @property | |
464 | def awaiting_versions(self): | |
465 | if self._phase == self.JID_WAIT: | |
466 | return {} | |
467 | elif self._phase == self.OSD_MAP_WAIT: | |
468 | return { | |
469 | OsdMap: self._await_version | |
470 | } | |
471 | elif self._phase == self.PG_MAP_WAIT: | |
472 | return { | |
473 | PgSummary: None, | |
474 | OsdMap: None | |
475 | } | |
476 | ||
477 | def on_map(self, sync_type, sync_object): | |
478 | self.log.debug("PgCreatingRequest %s %s" % (sync_type.str, self._phase)) | |
479 | if self._phase == self.PG_MAP_WAIT: | |
480 | if sync_type == PgSummary: | |
481 | # Count the PGs in this pool which are not in state 'creating' | |
482 | pg_summary = sync_object | |
483 | pgs_not_creating = 0 | |
484 | ||
485 | for state_tuple, count in pg_summary.data['by_pool'][self._pool_id.__str__()].items(): | |
486 | states = state_tuple.split("+") | |
487 | if 'creating' not in states: | |
488 | pgs_not_creating += count | |
489 | ||
490 | self._pg_progress.set_created_pg_count(pgs_not_creating) | |
491 | self.log.debug("PgCreatingRequest.on_map: pg_counter=%s/%s (final %s)" % ( | |
492 | pgs_not_creating, self._pg_progress.goal, self._pg_progress.final)) | |
493 | if pgs_not_creating >= self._pg_progress.goal: | |
494 | if self._pg_progress.is_final_block(): | |
495 | self.log.debug("PgCreatingRequest.on_map Creations complete") | |
496 | if self._post_create_commands: | |
497 | self.log.debug("PgCreatingRequest.on_map Issuing post-create commands") | |
498 | self._submit(self._post_create_commands) | |
499 | self._phase = self.JID_WAIT | |
500 | else: | |
501 | self.log.debug("PgCreatingRequest.on_map All done") | |
502 | self.complete() | |
503 | else: | |
504 | self.log.debug("PgCreatingREQUEST.on_map Issuing more creates") | |
505 | self._pg_progress.advance_goal() | |
506 | # Request another tranche of PGs up to _block_size | |
507 | self._submit([('osd pool set', { | |
508 | 'pool': self._pool_name, | |
509 | 'var': 'pg_num', | |
510 | 'val': self._pg_progress.goal | |
511 | })]) | |
512 | self._phase = self.JID_WAIT | |
513 | elif sync_type == OsdMap: | |
514 | # Keep an eye on the OsdMap to check that pg_num is what we expect: otherwise | |
515 | # if forces of darkness changed pg_num then our PG creation check could | |
516 | # get confused and fail to complete. | |
517 | osd_map = sync_object | |
518 | pool = osd_map.pools_by_id[self._pool_id] | |
519 | if pool['pg_num'] != self._pg_progress.expected_count(): | |
520 | self.set_error("PG creation interrupted (unexpected change to pg_num)") | |
521 | self.complete() | |
522 | return | |
523 | else: | |
524 | raise NotImplementedError("Unexpected map {1} in state {2}".format( | |
525 | sync_type, self._phase | |
526 | )) | |
527 | ||
528 | elif self._phase == self.OSD_MAP_WAIT: | |
529 | # Read back the pg_num for my pool from the OSD map | |
530 | osd_map = sync_object | |
531 | pool = osd_map.pools_by_id[self._pool_id] | |
532 | ||
533 | # In Ceph <= 0.67.7, "osd pool set pg_num" will return success even if it hasn't | |
534 | # really increased pg_num, so we must examine the OSD map to see if it really succeded | |
535 | if pool['pg_num'] != self._pg_progress.expected_count(): | |
536 | self.set_error("PG creation failed (check that there aren't already PGs in 'creating' state)") | |
537 | self.complete() | |
538 | return | |
539 | ||
540 | assert self._await_version | |
541 | ready = osd_map.version >= self._await_version | |
542 | if ready: | |
543 | # OSD map advancement either means a PG creation round completed, or that | |
544 | # the post_create_commands completed. Distinguish by looking at pg_progress. | |
545 | if self._pg_progress.is_complete(): | |
546 | # This was the OSD map update from the post_create_commands, we we're all done! | |
547 | self.complete() | |
548 | else: | |
549 | # This was the OSD map update from a PG creation command, so start waiting | |
550 | # for the pgs | |
551 | self._phase = self.PG_MAP_WAIT | |
552 | else: | |
553 | raise NotImplementedError("Unexpected {0} in phase {1}".format(sync_type, self._phase)) |