]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/module.py
import ceph 14.2.5
[ceph.git] / ceph / src / pybind / mgr / rbd_support / module.py
1 """
2 RBD support module
3 """
4
5 import errno
6 import json
7 import rados
8 import rbd
9 import re
10 import time
11 import traceback
12 import uuid
13
14 from mgr_module import MgrModule
15
16 from contextlib import contextmanager
17 from datetime import datetime, timedelta
18 from functools import partial, wraps
19 from threading import Condition, Lock, Thread
20
21
22 GLOBAL_POOL_KEY = (None, None)
23
24 QUERY_POOL_ID = "pool_id"
25 QUERY_POOL_ID_MAP = "pool_id_map"
26 QUERY_IDS = "query_ids"
27 QUERY_SUM_POOL_COUNTERS = "pool_counters"
28 QUERY_RAW_POOL_COUNTERS = "raw_pool_counters"
29 QUERY_LAST_REQUEST = "last_request"
30
31 OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$'
32 OSD_PERF_QUERY_COUNTERS = ['write_ops',
33 'read_ops',
34 'write_bytes',
35 'read_bytes',
36 'write_latency',
37 'read_latency']
38 OSD_PERF_QUERY_COUNTERS_INDICES = {
39 OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))}
40
41 OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5]
42 OSD_PERF_QUERY_MAX_RESULTS = 256
43
44 POOL_REFRESH_INTERVAL = timedelta(minutes=5)
45 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
46 STATS_RATE_INTERVAL = timedelta(minutes=1)
47
48 REPORT_MAX_RESULTS = 64
49
50 RBD_TASK_OID = "rbd_task"
51
52 TASK_SEQUENCE = "sequence"
53 TASK_ID = "id"
54 TASK_REFS = "refs"
55 TASK_MESSAGE = "message"
56 TASK_RETRY_TIME = "retry_time"
57 TASK_IN_PROGRESS = "in_progress"
58 TASK_PROGRESS = "progress"
59 TASK_CANCELED = "canceled"
60
61 TASK_REF_POOL_NAME = "pool_name"
62 TASK_REF_POOL_NAMESPACE = "pool_namespace"
63 TASK_REF_IMAGE_NAME = "image_name"
64 TASK_REF_IMAGE_ID = "image_id"
65 TASK_REF_ACTION = "action"
66
67 TASK_REF_ACTION_FLATTEN = "flatten"
68 TASK_REF_ACTION_REMOVE = "remove"
69 TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
70 TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
71 TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
72 TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
73
74 VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
75 TASK_REF_ACTION_REMOVE,
76 TASK_REF_ACTION_TRASH_REMOVE,
77 TASK_REF_ACTION_MIGRATION_EXECUTE,
78 TASK_REF_ACTION_MIGRATION_COMMIT,
79 TASK_REF_ACTION_MIGRATION_ABORT]
80
81 TASK_RETRY_INTERVAL = timedelta(seconds=30)
82 MAX_COMPLETED_TASKS = 50
83
84
85 def extract_pool_key(pool_spec):
86 if not pool_spec:
87 return GLOBAL_POOL_KEY
88
89 match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec)
90 if not match:
91 raise ValueError("Invalid pool spec: {}".format(pool_spec))
92 return (match.group(1), match.group(2) or '')
93
94
95 def get_rbd_pools(module):
96 osd_map = module.get('osd_map')
97 return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
98 if 'rbd' in pool.get('application_metadata', {})}
99
100
101 class PerfHandler:
102 user_queries = {}
103 image_cache = {}
104
105 lock = Lock()
106 query_condition = Condition(lock)
107 refresh_condition = Condition(lock)
108 thread = None
109
110 image_name_cache = {}
111 image_name_refresh_time = datetime.fromtimestamp(0)
112
113 @classmethod
114 def prepare_regex(cls, value):
115 return '^({})$'.format(value)
116
117 @classmethod
118 def prepare_osd_perf_query(cls, pool_id, namespace, counter_type):
119 pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
120 namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
121 if pool_id:
122 pool_id_regex = cls.prepare_regex(pool_id)
123 if namespace:
124 namespace_regex = cls.prepare_regex(namespace)
125
126 return {
127 'key_descriptor': [
128 {'type': 'pool_id', 'regex': pool_id_regex},
129 {'type': 'namespace', 'regex': namespace_regex},
130 {'type': 'object_name',
131 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
132 ],
133 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS,
134 'limit': {'order_by': counter_type,
135 'max_count': OSD_PERF_QUERY_MAX_RESULTS},
136 }
137
138 @classmethod
139 def pool_spec_search_keys(cls, pool_key):
140 return [pool_key[0:len(pool_key) - x]
141 for x in range(0, len(pool_key) + 1)]
142
143 @classmethod
144 def submatch_pool_key(cls, pool_key, search_key):
145 return ((pool_key[1] == search_key[1] or not search_key[1])
146 and (pool_key[0] == search_key[0] or not search_key[0]))
147
148 def __init__(self, module):
149 self.module = module
150 self.log = module.log
151
152 self.thread = Thread(target=self.run)
153 self.thread.start()
154
155 def run(self):
156 try:
157 self.log.info("PerfHandler: starting")
158 while True:
159 with self.lock:
160 self.scrub_expired_queries()
161 self.process_raw_osd_perf_counters()
162 self.refresh_condition.notify()
163
164 stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
165 self.query_condition.wait(stats_period)
166
167 self.log.debug("PerfHandler: tick")
168
169 except Exception as ex:
170 self.log.fatal("Fatal runtime error: {}\n{}".format(
171 ex, traceback.format_exc()))
172
173 def merge_raw_osd_perf_counters(self, pool_key, query, now_ts,
174 resolve_image_names):
175 pool_id_map = query[QUERY_POOL_ID_MAP]
176
177 # collect and combine the raw counters from all sort orders
178 raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
179 for query_id in query[QUERY_IDS]:
180 res = self.module.get_osd_perf_counters(query_id)
181 for counter in res['counters']:
182 # replace pool id from object name if it exists
183 k = counter['k']
184 pool_id = int(k[2][0]) if k[2][0] else int(k[0][0])
185 namespace = k[1][0]
186 image_id = k[2][1]
187
188 # ignore metrics from non-matching pools/namespaces
189 if pool_id not in pool_id_map:
190 continue
191 if pool_key[1] is not None and pool_key[1] != namespace:
192 continue
193
194 # flag the pool (and namespace) for refresh if we cannot find
195 # image name in the cache
196 resolve_image_key = (pool_id, namespace)
197 if image_id not in self.image_name_cache.get(resolve_image_key, {}):
198 resolve_image_names.add(resolve_image_key)
199
200 # copy the 'sum' counter values for each image (ignore count)
201 # if we haven't already processed it for this round
202 raw_namespaces = raw_pool_counters.setdefault(pool_id, {})
203 raw_images = raw_namespaces.setdefault(namespace, {})
204 raw_image = raw_images.setdefault(image_id, [None, None])
205
206 # save the last two perf counters for each image
207 if raw_image[0] and raw_image[0][0] < now_ts:
208 raw_image[1] = raw_image[0]
209 raw_image[0] = None
210 if not raw_image[0]:
211 raw_image[0] = [now_ts, [int(x[0]) for x in counter['c']]]
212
213 self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters))
214 return raw_pool_counters
215
216 def sum_osd_perf_counters(self, query, raw_pool_counters, now_ts):
217 # update the cumulative counters for each image
218 sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
219 for pool_id, raw_namespaces in raw_pool_counters.items():
220 sum_namespaces = sum_pool_counters.setdefault(pool_id, {})
221 for namespace, raw_images in raw_namespaces.items():
222 sum_namespace = sum_namespaces.setdefault(namespace, {})
223 for image_id, raw_image in raw_images.items():
224 # zero-out non-updated raw counters
225 if not raw_image[0]:
226 continue
227 elif raw_image[0][0] < now_ts:
228 raw_image[1] = raw_image[0]
229 raw_image[0] = [now_ts, [0 for x in raw_image[1][1]]]
230 continue
231
232 counters = raw_image[0][1]
233
234 # copy raw counters if this is a newly discovered image or
235 # increment existing counters
236 sum_image = sum_namespace.setdefault(image_id, None)
237 if sum_image:
238 for i in range(len(counters)):
239 sum_image[i] += counters[i]
240 else:
241 sum_namespace[image_id] = [x for x in counters]
242
243 self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters))
244 return sum_pool_counters
245
246 def refresh_image_names(self, resolve_image_names):
247 for pool_id, namespace in resolve_image_names:
248 image_key = (pool_id, namespace)
249 images = self.image_name_cache.setdefault(image_key, {})
250 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
251 ioctx.set_namespace(namespace)
252 for image_meta in rbd.RBD().list2(ioctx):
253 images[image_meta['id']] = image_meta['name']
254 self.log.debug("resolve_image_names: {}={}".format(image_key, images))
255
256 def scrub_missing_images(self):
257 for pool_key, query in self.user_queries.items():
258 raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {})
259 sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {})
260 for pool_id, sum_namespaces in sum_pool_counters.items():
261 raw_namespaces = raw_pool_counters.get(pool_id, {})
262 for namespace, sum_images in sum_namespaces.items():
263 raw_images = raw_namespaces.get(namespace, {})
264
265 image_key = (pool_id, namespace)
266 image_names = self.image_name_cache.get(image_key, {})
267 for image_id in list(sum_images.keys()):
268 # scrub image counters if we failed to resolve image name
269 if image_id not in image_names:
270 self.log.debug("scrub_missing_images: dropping {}/{}".format(
271 image_key, image_id))
272 del sum_images[image_id]
273 if image_id in raw_images:
274 del raw_images[image_id]
275
276 def process_raw_osd_perf_counters(self):
277 now = datetime.now()
278 now_ts = int(now.strftime("%s"))
279
280 # clear the image name cache if we need to refresh all active pools
281 if self.image_name_cache and \
282 self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now:
283 self.log.debug("process_raw_osd_perf_counters: expiring image name cache")
284 self.image_name_cache = {}
285
286 resolve_image_names = set()
287 for pool_key, query in self.user_queries.items():
288 if not query[QUERY_IDS]:
289 continue
290
291 raw_pool_counters = self.merge_raw_osd_perf_counters(
292 pool_key, query, now_ts, resolve_image_names)
293 self.sum_osd_perf_counters(query, raw_pool_counters, now_ts)
294
295 if resolve_image_names:
296 self.image_name_refresh_time = now
297 self.refresh_image_names(resolve_image_names)
298 self.scrub_missing_images()
299 elif not self.image_name_cache:
300 self.scrub_missing_images()
301
302 def resolve_pool_id(self, pool_name):
303 pool_id = self.module.rados.pool_lookup(pool_name)
304 if not pool_id:
305 raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name))
306 return pool_id
307
308 def scrub_expired_queries(self):
309 # perf counters need to be periodically refreshed to continue
310 # to be registered
311 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
312 for pool_key in list(self.user_queries.keys()):
313 user_query = self.user_queries[pool_key]
314 if user_query[QUERY_LAST_REQUEST] < expire_time:
315 self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
316 del self.user_queries[pool_key]
317
318 def register_osd_perf_queries(self, pool_id, namespace):
319 query_ids = []
320 try:
321 for counter in OSD_PERF_QUERY_COUNTERS:
322 query = self.prepare_osd_perf_query(pool_id, namespace, counter)
323 self.log.debug("register_osd_perf_queries: {}".format(query))
324
325 query_id = self.module.add_osd_perf_query(query)
326 if query_id is None:
327 raise RuntimeError('Failed to add OSD perf query: {}'.format(query))
328 query_ids.append(query_id)
329
330 except Exception:
331 for query_id in query_ids:
332 self.module.remove_osd_perf_query(query_id)
333 raise
334
335 return query_ids
336
337 def unregister_osd_perf_queries(self, pool_key, query_ids):
338 self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
339 pool_key, query_ids))
340 for query_id in query_ids:
341 self.module.remove_osd_perf_query(query_id)
342 query_ids[:] = []
343
344 def register_query(self, pool_key):
345 if pool_key not in self.user_queries:
346 pool_id = None
347 if pool_key[0]:
348 pool_id = self.resolve_pool_id(pool_key[0])
349
350 user_query = {
351 QUERY_POOL_ID: pool_id,
352 QUERY_POOL_ID_MAP: {pool_id: pool_key[0]},
353 QUERY_IDS: self.register_osd_perf_queries(pool_id, pool_key[1]),
354 QUERY_LAST_REQUEST: datetime.now()
355 }
356
357 self.user_queries[pool_key] = user_query
358
359 # force an immediate stat pull if this is a new query
360 self.query_condition.notify()
361 self.refresh_condition.wait(5)
362
363 else:
364 user_query = self.user_queries[pool_key]
365
366 # ensure query doesn't expire
367 user_query[QUERY_LAST_REQUEST] = datetime.now()
368
369 if pool_key == GLOBAL_POOL_KEY:
370 # refresh the global pool id -> name map upon each
371 # processing period
372 user_query[QUERY_POOL_ID_MAP] = {
373 pool_id: pool_name for pool_id, pool_name
374 in get_rbd_pools(self.module).items()}
375
376 self.log.debug("register_query: pool_key={}, query_ids={}".format(
377 pool_key, user_query[QUERY_IDS]))
378
379 return user_query
380
381 def extract_stat(self, index, raw_image, sum_image):
382 # require two raw counters between a fixed time window
383 if not raw_image or not raw_image[0] or not raw_image[1]:
384 return 0
385
386 current_time = raw_image[0][0]
387 previous_time = raw_image[1][0]
388 if current_time <= previous_time or \
389 current_time - previous_time > STATS_RATE_INTERVAL.total_seconds():
390 return 0
391
392 current_value = raw_image[0][1][index]
393 instant_rate = float(current_value) / (current_time - previous_time)
394
395 # convert latencies from sum to average per op
396 ops_index = None
397 if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency':
398 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops']
399 elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency':
400 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops']
401
402 if ops_index is not None:
403 ops = max(1, self.extract_stat(ops_index, raw_image, sum_image))
404 instant_rate /= ops
405
406 return instant_rate
407
408 def extract_counter(self, index, raw_image, sum_image):
409 if sum_image:
410 return sum_image[index]
411 return 0
412
413 def generate_report(self, query, sort_by, extract_data):
414 pool_id_map = query[QUERY_POOL_ID_MAP]
415 sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
416 raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
417
418 sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by)
419
420 # pre-sort and limit the response
421 results = []
422 for pool_id, sum_namespaces in sum_pool_counters.items():
423 if pool_id not in pool_id_map:
424 continue
425 raw_namespaces = raw_pool_counters.get(pool_id, {})
426 for namespace, sum_images in sum_namespaces.items():
427 raw_images = raw_namespaces.get(namespace, {})
428 for image_id, sum_image in sum_images.items():
429 raw_image = raw_images.get(image_id, [])
430
431 # always sort by recent IO activity
432 results.append([(pool_id, namespace, image_id),
433 self.extract_stat(sort_by_index, raw_image,
434 sum_image)])
435 results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS]
436
437 # build the report in sorted order
438 pool_descriptors = {}
439 counters = []
440 for key, _ in results:
441 pool_id = key[0]
442 pool_name = pool_id_map[pool_id]
443
444 namespace = key[1]
445 image_id = key[2]
446 image_names = self.image_name_cache.get((pool_id, namespace), {})
447 image_name = image_names[image_id]
448
449 raw_namespaces = raw_pool_counters.get(pool_id, {})
450 raw_images = raw_namespaces.get(namespace, {})
451 raw_image = raw_images.get(image_id, [])
452
453 sum_namespaces = sum_pool_counters[pool_id]
454 sum_images = sum_namespaces[namespace]
455 sum_image = sum_images.get(image_id, [])
456
457 pool_descriptor = pool_name
458 if namespace:
459 pool_descriptor += "/{}".format(namespace)
460 pool_index = pool_descriptors.setdefault(pool_descriptor,
461 len(pool_descriptors))
462 image_descriptor = "{}/{}".format(pool_index, image_name)
463 data = [extract_data(i, raw_image, sum_image)
464 for i in range(len(OSD_PERF_QUERY_COUNTERS))]
465
466 # skip if no data to report
467 if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]:
468 continue
469
470 counters.append({image_descriptor: data})
471
472 return {idx: descriptor for descriptor, idx
473 in pool_descriptors.items()}, \
474 counters
475
476 def get_perf_data(self, report, pool_spec, sort_by, extract_data):
477 self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
478 report, pool_spec, sort_by))
479 self.scrub_expired_queries()
480
481 pool_key = extract_pool_key(pool_spec)
482 user_query = self.register_query(pool_key)
483
484 now = datetime.now()
485 pool_descriptors, counters = self.generate_report(
486 user_query, sort_by, extract_data)
487
488 report = {
489 'timestamp': time.mktime(now.timetuple()),
490 '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS,
491 'pool_descriptors': pool_descriptors,
492 '{}s'.format(report): counters
493 }
494
495 return 0, json.dumps(report), ""
496
497 def get_perf_stats(self, pool_spec, sort_by):
498 return self.get_perf_data(
499 "stat", pool_spec, sort_by, self.extract_stat)
500
501 def get_perf_counters(self, pool_spec, sort_by):
502 return self.get_perf_data(
503 "counter", pool_spec, sort_by, self.extract_counter)
504
505 def handle_command(self, inbuf, prefix, cmd):
506 with self.lock:
507 if prefix == 'image stats':
508 return self.get_perf_stats(cmd.get('pool_spec', None),
509 cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
510 elif prefix == 'image counters':
511 return self.get_perf_counters(cmd.get('pool_spec', None),
512 cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
513
514 raise NotImplementedError(cmd['prefix'])
515
516
517 class Throttle:
518 def __init__(self, throttle_period):
519 self.throttle_period = throttle_period
520 self.time_of_last_call = datetime.min
521
522 def __call__(self, fn):
523 @wraps(fn)
524 def wrapper(*args, **kwargs):
525 now = datetime.now()
526 if self.time_of_last_call + self.throttle_period <= now:
527 self.time_of_last_call = now
528 return fn(*args, **kwargs)
529 return wrapper
530
531
532 class Task:
533 def __init__(self, sequence, task_id, message, refs):
534 self.sequence = sequence
535 self.task_id = task_id
536 self.message = message
537 self.refs = refs
538 self.retry_time = None
539 self.in_progress = False
540 self.progress = 0.0
541 self.canceled = False
542 self.failed = False
543
544 def __str__(self):
545 return self.to_json()
546
547 @property
548 def sequence_key(self):
549 return "{0:016X}".format(self.sequence)
550
551 def cancel(self):
552 self.canceled = True
553 self.fail("Operation canceled")
554
555 def fail(self, message):
556 self.failed = True
557 self.failure_message = message
558
559 def to_dict(self):
560 d = {TASK_SEQUENCE: self.sequence,
561 TASK_ID: self.task_id,
562 TASK_MESSAGE: self.message,
563 TASK_REFS: self.refs
564 }
565 if self.retry_time:
566 d[TASK_RETRY_TIME] = self.retry_time.isoformat()
567 if self.in_progress:
568 d[TASK_IN_PROGRESS] = True
569 d[TASK_PROGRESS] = self.progress
570 if self.canceled:
571 d[TASK_CANCELED] = True
572 return d
573
574 def to_json(self):
575 return str(json.dumps(self.to_dict()))
576
577 @classmethod
578 def from_json(cls, val):
579 try:
580 d = json.loads(val)
581 action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
582 if action not in VALID_TASK_ACTIONS:
583 raise ValueError("Invalid task action: {}".format(action))
584
585 return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
586 except json.JSONDecodeError as e:
587 raise ValueError("Invalid JSON ({})".format(str(e)))
588 except KeyError as e:
589 raise ValueError("Invalid task format (missing key {})".format(str(e)))
590
591
592 class TaskHandler:
593 lock = Lock()
594 condition = Condition(lock)
595 thread = None
596
597 in_progress_task = None
598 tasks_by_sequence = dict()
599 tasks_by_id = dict()
600
601 completed_tasks = []
602
603 sequence = 0
604
605 def __init__(self, module):
606 self.module = module
607 self.log = module.log
608
609 with self.lock:
610 self.init_task_queue()
611
612 self.thread = Thread(target=self.run)
613 self.thread.start()
614
615 @property
616 def default_pool_name(self):
617 return self.module.get_ceph_option("rbd_default_pool")
618
619 def extract_pool_spec(self, pool_spec):
620 pool_spec = extract_pool_key(pool_spec)
621 if pool_spec == GLOBAL_POOL_KEY:
622 pool_spec = (self.default_pool_name, '')
623 return pool_spec
624
625 def extract_image_spec(self, image_spec):
626 match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
627 image_spec or '')
628 if not match:
629 raise ValueError("Invalid image spec: {}".format(image_spec))
630 return (match.group(1) or self.default_pool_name, match.group(2) or '',
631 match.group(3))
632
633 def run(self):
634 try:
635 self.log.info("TaskHandler: starting")
636 while True:
637 with self.lock:
638 now = datetime.now()
639 for sequence in sorted([sequence for sequence, task
640 in self.tasks_by_sequence.items()
641 if not task.retry_time or task.retry_time <= now]):
642 self.execute_task(sequence)
643
644 self.condition.wait(5)
645 self.log.debug("TaskHandler: tick")
646
647 except Exception as ex:
648 self.log.fatal("Fatal runtime error: {}\n{}".format(
649 ex, traceback.format_exc()))
650
651 @contextmanager
652 def open_ioctx(self, spec):
653 try:
654 with self.module.rados.open_ioctx(spec[0]) as ioctx:
655 ioctx.set_namespace(spec[1])
656 yield ioctx
657 except rados.ObjectNotFound:
658 self.log.error("Failed to locate pool {}".format(spec[0]))
659 raise
660
661 @classmethod
662 def format_image_spec(cls, image_spec):
663 image = image_spec[2]
664 if image_spec[1]:
665 image = "{}/{}".format(image_spec[1], image)
666 if image_spec[0]:
667 image = "{}/{}".format(image_spec[0], image)
668 return image
669
670 def init_task_queue(self):
671 for pool_id, pool_name in get_rbd_pools(self.module).items():
672 try:
673 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
674 self.load_task_queue(ioctx, pool_name)
675
676 try:
677 namespaces = rbd.RBD().namespace_list(ioctx)
678 except rbd.OperationNotSupported:
679 self.log.debug("Namespaces not supported")
680 continue
681
682 for namespace in namespaces:
683 ioctx.set_namespace(namespace)
684 self.load_task_queue(ioctx, pool_name)
685
686 except rados.ObjectNotFound:
687 # pool DNE
688 pass
689
690 if self.tasks_by_sequence:
691 self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
692
693 self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
694 self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
695
696 def load_task_queue(self, ioctx, pool_name):
697 pool_spec = pool_name
698 if ioctx.nspace:
699 pool_spec += "/{}".format(ioctx.nspace)
700
701 start_after = ''
702 try:
703 while True:
704 with rados.ReadOpCtx() as read_op:
705 self.log.info("load_task_task: {}, start_after={}".format(
706 pool_spec, start_after))
707 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
708 ioctx.operate_read_op(read_op, RBD_TASK_OID)
709
710 it = list(it)
711 for k, v in it:
712 start_after = k
713 v = v.decode()
714 self.log.info("load_task_task: task={}".format(v))
715
716 try:
717 task = Task.from_json(v)
718 self.append_task(task)
719 except ValueError:
720 self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
721
722 if not it:
723 break
724
725 except StopIteration:
726 pass
727 except rados.ObjectNotFound:
728 # rbd_task DNE
729 pass
730
731 def append_task(self, task):
732 self.tasks_by_sequence[task.sequence] = task
733 self.tasks_by_id[task.task_id] = task
734
735 def task_refs_match(self, task_refs, refs):
736 if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
737 task_refs = task_refs.copy()
738 del task_refs[TASK_REF_IMAGE_ID]
739
740 self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
741 return task_refs == refs
742
743 def find_task(self, refs):
744 self.log.debug("find_task: refs={}".format(refs))
745
746 # search for dups and return the original
747 for task_id in reversed(sorted(self.tasks_by_id.keys())):
748 task = self.tasks_by_id[task_id]
749 if self.task_refs_match(task.refs, refs):
750 return task
751
752 # search for a completed task (message replay)
753 for task in reversed(self.completed_tasks):
754 if self.task_refs_match(task.refs, refs):
755 return task
756
757 def add_task(self, ioctx, message, refs):
758 self.log.debug("add_task: message={}, refs={}".format(message, refs))
759
760 # ensure unique uuid across all pools
761 while True:
762 task_id = str(uuid.uuid4())
763 if task_id not in self.tasks_by_id:
764 break
765
766 self.sequence += 1
767 task = Task(self.sequence, task_id, message, refs)
768
769 # add the task to the rbd_task omap
770 task_json = task.to_json()
771 omap_keys = (task.sequence_key, )
772 omap_vals = (str.encode(task_json), )
773 self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0]))
774
775 with rados.WriteOpCtx() as write_op:
776 ioctx.set_omap(write_op, omap_keys, omap_vals)
777 ioctx.operate_write_op(write_op, RBD_TASK_OID)
778 self.append_task(task)
779
780 self.condition.notify()
781 return task_json
782
783 def remove_task(self, ioctx, task, remove_in_memory=True):
784 self.log.info("remove_task: task={}".format(str(task)))
785 omap_keys = (task.sequence_key, )
786 try:
787 with rados.WriteOpCtx() as write_op:
788 ioctx.remove_omap_keys(write_op, omap_keys)
789 ioctx.operate_write_op(write_op, RBD_TASK_OID)
790 except rados.ObjectNotFound:
791 pass
792
793 if remove_in_memory:
794 try:
795 del self.tasks_by_id[task.task_id]
796 del self.tasks_by_sequence[task.sequence]
797
798 # keep a record of the last N tasks to help avoid command replay
799 # races
800 if not task.failed and not task.canceled:
801 self.log.debug("remove_task: moving to completed tasks")
802 self.completed_tasks.append(task)
803 self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
804
805 except KeyError:
806 pass
807
808 def execute_task(self, sequence):
809 task = self.tasks_by_sequence[sequence]
810 self.log.info("execute_task: task={}".format(str(task)))
811
812 pool_valid = False
813 try:
814 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
815 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
816 pool_valid = True
817
818 action = task.refs[TASK_REF_ACTION]
819 execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
820 TASK_REF_ACTION_REMOVE: self.execute_remove,
821 TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
822 TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
823 TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
824 TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
825 }.get(action)
826 if not execute_fn:
827 self.log.error("Invalid task action: {}".format(action))
828 else:
829 task.in_progress = True
830 self.in_progress_task = task
831 self.update_progress(task, 0)
832
833 self.lock.release()
834 try:
835 execute_fn(ioctx, task)
836
837 except rbd.OperationCanceled:
838 self.log.info("Operation canceled: task={}".format(
839 str(task)))
840
841 finally:
842 self.lock.acquire()
843
844 task.in_progress = False
845 self.in_progress_task = None
846
847 self.complete_progress(task)
848 self.remove_task(ioctx, task)
849
850 except rados.ObjectNotFound as e:
851 self.log.error("execute_task: {}".format(e))
852 if pool_valid:
853 self.update_progress(task, 0)
854 else:
855 # pool DNE -- remove the task
856 self.complete_progress(task)
857 self.remove_task(ioctx, task)
858
859 except (rados.Error, rbd.Error) as e:
860 self.log.error("execute_task: {}".format(e))
861 self.update_progress(task, 0)
862
863 finally:
864 task.in_progress = False
865 task.retry_time = datetime.now() + TASK_RETRY_INTERVAL
866
867 def progress_callback(self, task, current, total):
868 progress = float(current) / float(total)
869 self.log.debug("progress_callback: task={}, progress={}".format(
870 str(task), progress))
871
872 # avoid deadlocking when a new command comes in during a progress callback
873 if not self.lock.acquire(False):
874 return 0
875
876 try:
877 if not self.in_progress_task or self.in_progress_task.canceled:
878 return -rbd.ECANCELED
879 self.in_progress_task.progress = progress
880 finally:
881 self.lock.release()
882
883 self.throttled_update_progress(task, progress)
884 return 0
885
886 def execute_flatten(self, ioctx, task):
887 self.log.info("execute_flatten: task={}".format(str(task)))
888
889 try:
890 with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
891 image.flatten(on_progress=partial(self.progress_callback, task))
892 except rbd.InvalidArgument:
893 task.fail("Image does not have parent")
894 self.log.info("{}: task={}".format(task.failure_message, str(task)))
895 except rbd.ImageNotFound:
896 task.fail("Image does not exist")
897 self.log.info("{}: task={}".format(task.failure_message, str(task)))
898
899 def execute_remove(self, ioctx, task):
900 self.log.info("execute_remove: task={}".format(str(task)))
901
902 try:
903 rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
904 on_progress=partial(self.progress_callback, task))
905 except rbd.ImageNotFound:
906 task.fail("Image does not exist")
907 self.log.info("{}: task={}".format(task.failure_message, str(task)))
908
909 def execute_trash_remove(self, ioctx, task):
910 self.log.info("execute_trash_remove: task={}".format(str(task)))
911
912 try:
913 rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
914 on_progress=partial(self.progress_callback, task))
915 except rbd.ImageNotFound:
916 task.fail("Image does not exist")
917 self.log.info("{}: task={}".format(task.failure_message, str(task)))
918
919 def execute_migration_execute(self, ioctx, task):
920 self.log.info("execute_migration_execute: task={}".format(str(task)))
921
922 try:
923 rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
924 on_progress=partial(self.progress_callback, task))
925 except rbd.ImageNotFound:
926 task.fail("Image does not exist")
927 self.log.info("{}: task={}".format(task.failure_message, str(task)))
928 except rbd.InvalidArgument:
929 task.fail("Image is not migrating")
930 self.log.info("{}: task={}".format(task.failure_message, str(task)))
931
932 def execute_migration_commit(self, ioctx, task):
933 self.log.info("execute_migration_commit: task={}".format(str(task)))
934
935 try:
936 rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
937 on_progress=partial(self.progress_callback, task))
938 except rbd.ImageNotFound:
939 task.fail("Image does not exist")
940 self.log.info("{}: task={}".format(task.failure_message, str(task)))
941 except rbd.InvalidArgument:
942 task.fail("Image is not migrating or migration not executed")
943 self.log.info("{}: task={}".format(task.failure_message, str(task)))
944
945 def execute_migration_abort(self, ioctx, task):
946 self.log.info("execute_migration_abort: task={}".format(str(task)))
947
948 try:
949 rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
950 on_progress=partial(self.progress_callback, task))
951 except rbd.ImageNotFound:
952 task.fail("Image does not exist")
953 self.log.info("{}: task={}".format(task.failure_message, str(task)))
954 except rbd.InvalidArgument:
955 task.fail("Image is not migrating")
956 self.log.info("{}: task={}".format(task.failure_message, str(task)))
957
958 def complete_progress(self, task):
959 self.log.debug("complete_progress: task={}".format(str(task)))
960 try:
961 if task.failed:
962 self.module.remote("progress", "fail", task.task_id,
963 task.failure_message)
964 else:
965 self.module.remote("progress", "complete", task.task_id)
966 except ImportError:
967 # progress module is disabled
968 pass
969
970 def update_progress(self, task, progress):
971 self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
972 try:
973 refs = {"origin": "rbd_support"}
974 refs.update(task.refs)
975
976 self.module.remote("progress", "update", task.task_id,
977 task.message, progress, refs)
978 except ImportError:
979 # progress module is disabled
980 pass
981
982 @Throttle(timedelta(seconds=1))
983 def throttled_update_progress(self, task, progress):
984 self.update_progress(task, progress)
985
986 def queue_flatten(self, image_spec):
987 image_spec = self.extract_image_spec(image_spec)
988 self.log.info("queue_flatten: {}".format(image_spec))
989
990 refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
991 TASK_REF_POOL_NAME: image_spec[0],
992 TASK_REF_POOL_NAMESPACE: image_spec[1],
993 TASK_REF_IMAGE_NAME: image_spec[2]}
994
995 with self.open_ioctx(image_spec) as ioctx:
996 try:
997 with rbd.Image(ioctx, image_spec[2]) as image:
998 refs[TASK_REF_IMAGE_ID] = image.id()
999
1000 try:
1001 parent_image_id = image.parent_id()
1002 except rbd.ImageNotFound:
1003 parent_image_id = None
1004
1005 except rbd.ImageNotFound:
1006 pass
1007
1008 task = self.find_task(refs)
1009 if task:
1010 return 0, task.to_json(), ''
1011
1012 if TASK_REF_IMAGE_ID not in refs:
1013 raise rbd.ImageNotFound("Image {} does not exist".format(
1014 self.format_image_spec(image_spec)), errno=errno.ENOENT)
1015 if not parent_image_id:
1016 raise rbd.ImageNotFound("Image {} does not have a parent".format(
1017 self.format_image_spec(image_spec)), errno=errno.ENOENT)
1018
1019 return 0, self.add_task(ioctx,
1020 "Flattening image {}".format(
1021 self.format_image_spec(image_spec)),
1022 refs), ""
1023
1024 def queue_remove(self, image_spec):
1025 image_spec = self.extract_image_spec(image_spec)
1026 self.log.info("queue_remove: {}".format(image_spec))
1027
1028 refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
1029 TASK_REF_POOL_NAME: image_spec[0],
1030 TASK_REF_POOL_NAMESPACE: image_spec[1],
1031 TASK_REF_IMAGE_NAME: image_spec[2]}
1032
1033 with self.open_ioctx(image_spec) as ioctx:
1034 try:
1035 with rbd.Image(ioctx, image_spec[2]) as image:
1036 refs[TASK_REF_IMAGE_ID] = image.id()
1037 snaps = list(image.list_snaps())
1038
1039 except rbd.ImageNotFound:
1040 pass
1041
1042 task = self.find_task(refs)
1043 if task:
1044 return 0, task.to_json(), ''
1045
1046 if TASK_REF_IMAGE_ID not in refs:
1047 raise rbd.ImageNotFound("Image {} does not exist".format(
1048 self.format_image_spec(image_spec)), errno=errno.ENOENT)
1049 if snaps:
1050 raise rbd.ImageBusy("Image {} has snapshots".format(
1051 self.format_image_spec(image_spec)), errno=errno.EBUSY)
1052
1053 return 0, self.add_task(ioctx,
1054 "Removing image {}".format(
1055 self.format_image_spec(image_spec)),
1056 refs), ''
1057
1058 def queue_trash_remove(self, image_id_spec):
1059 image_id_spec = self.extract_image_spec(image_id_spec)
1060 self.log.info("queue_trash_remove: {}".format(image_id_spec))
1061
1062 refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
1063 TASK_REF_POOL_NAME: image_id_spec[0],
1064 TASK_REF_POOL_NAMESPACE: image_id_spec[1],
1065 TASK_REF_IMAGE_ID: image_id_spec[2]}
1066 task = self.find_task(refs)
1067 if task:
1068 return 0, task.to_json(), ''
1069
1070 # verify that image exists in trash
1071 with self.open_ioctx(image_id_spec) as ioctx:
1072 rbd.RBD().trash_get(ioctx, image_id_spec[2])
1073
1074 return 0, self.add_task(ioctx,
1075 "Removing image {} from trash".format(
1076 self.format_image_spec(image_id_spec)),
1077 refs), ''
1078
1079 def get_migration_status(self, ioctx, image_spec):
1080 try:
1081 return rbd.RBD().migration_status(ioctx, image_spec[2])
1082 except (rbd.InvalidArgument, rbd.ImageNotFound):
1083 return None
1084
1085 def validate_image_migrating(self, image_spec, migration_status):
1086 if not migration_status:
1087 raise rbd.InvalidArgument("Image {} is not migrating".format(
1088 self.format_image_spec(image_spec)), errno=errno.EINVAL)
1089
1090 def resolve_pool_name(self, pool_id):
1091 osd_map = self.module.get('osd_map')
1092 for pool in osd_map['pools']:
1093 if pool['pool'] == pool_id:
1094 return pool['pool_name']
1095 return '<unknown>'
1096
1097 def queue_migration_execute(self, image_spec):
1098 image_spec = self.extract_image_spec(image_spec)
1099 self.log.info("queue_migration_execute: {}".format(image_spec))
1100
1101 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
1102 TASK_REF_POOL_NAME: image_spec[0],
1103 TASK_REF_POOL_NAMESPACE: image_spec[1],
1104 TASK_REF_IMAGE_NAME: image_spec[2]}
1105
1106 with self.open_ioctx(image_spec) as ioctx:
1107 status = self.get_migration_status(ioctx, image_spec)
1108 if status:
1109 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
1110
1111 task = self.find_task(refs)
1112 if task:
1113 return 0, task.to_json(), ''
1114
1115 self.validate_image_migrating(image_spec, status)
1116 if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
1117 rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
1118 raise rbd.InvalidArgument("Image {} is not in ready state".format(
1119 self.format_image_spec(image_spec)), errno=errno.EINVAL)
1120
1121 source_pool = self.resolve_pool_name(status['source_pool_id'])
1122 dest_pool = self.resolve_pool_name(status['dest_pool_id'])
1123 return 0, self.add_task(ioctx,
1124 "Migrating image {} to {}".format(
1125 self.format_image_spec((source_pool,
1126 status['source_pool_namespace'],
1127 status['source_image_name'])),
1128 self.format_image_spec((dest_pool,
1129 status['dest_pool_namespace'],
1130 status['dest_image_name']))),
1131 refs), ''
1132
1133 def queue_migration_commit(self, image_spec):
1134 image_spec = self.extract_image_spec(image_spec)
1135 self.log.info("queue_migration_commit: {}".format(image_spec))
1136
1137 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
1138 TASK_REF_POOL_NAME: image_spec[0],
1139 TASK_REF_POOL_NAMESPACE: image_spec[1],
1140 TASK_REF_IMAGE_NAME: image_spec[2]}
1141
1142 with self.open_ioctx(image_spec) as ioctx:
1143 status = self.get_migration_status(ioctx, image_spec)
1144 if status:
1145 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
1146
1147 task = self.find_task(refs)
1148 if task:
1149 return 0, task.to_json(), ''
1150
1151 self.validate_image_migrating(image_spec, status)
1152 if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
1153 raise rbd.InvalidArgument("Image {} has not completed migration".format(
1154 self.format_image_spec(image_spec)), errno=errno.EINVAL)
1155
1156 return 0, self.add_task(ioctx,
1157 "Committing image migration for {}".format(
1158 self.format_image_spec(image_spec)),
1159 refs), ''
1160
1161 def queue_migration_abort(self, image_spec):
1162 image_spec = self.extract_image_spec(image_spec)
1163 self.log.info("queue_migration_abort: {}".format(image_spec))
1164
1165 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
1166 TASK_REF_POOL_NAME: image_spec[0],
1167 TASK_REF_POOL_NAMESPACE: image_spec[1],
1168 TASK_REF_IMAGE_NAME: image_spec[2]}
1169
1170 with self.open_ioctx(image_spec) as ioctx:
1171 status = self.get_migration_status(ioctx, image_spec)
1172 if status:
1173 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
1174
1175 task = self.find_task(refs)
1176 if task:
1177 return 0, task.to_json(), ''
1178
1179 self.validate_image_migrating(image_spec, status)
1180 return 0, self.add_task(ioctx,
1181 "Aborting image migration for {}".format(
1182 self.format_image_spec(image_spec)),
1183 refs), ''
1184
1185 def task_cancel(self, task_id):
1186 self.log.info("task_cancel: {}".format(task_id))
1187
1188 if task_id not in self.tasks_by_id:
1189 return -errno.ENOENT, '', "No such task {}".format(task_id)
1190
1191 task = self.tasks_by_id[task_id]
1192 task.cancel()
1193
1194 remove_in_memory = True
1195 if self.in_progress_task and self.in_progress_task.task_id == task_id:
1196 self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
1197 remove_in_memory = False
1198
1199 # complete any associated event in the progress module
1200 self.complete_progress(task)
1201
1202 # remove from rbd_task omap
1203 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
1204 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
1205 self.remove_task(ioctx, task, remove_in_memory)
1206
1207 return 0, "", ""
1208
1209 def task_list(self, task_id):
1210 self.log.info("task_list: {}".format(task_id))
1211
1212 if task_id:
1213 if task_id not in self.tasks_by_id:
1214 return -errno.ENOENT, '', "No such task {}".format(task_id)
1215
1216 result = self.tasks_by_id[task_id].to_dict()
1217 else:
1218 result = []
1219 for sequence in sorted(self.tasks_by_sequence.keys()):
1220 task = self.tasks_by_sequence[sequence]
1221 result.append(task.to_dict())
1222
1223 return 0, json.dumps(result), ""
1224
1225 def handle_command(self, inbuf, prefix, cmd):
1226 with self.lock:
1227 if prefix == 'add flatten':
1228 return self.queue_flatten(cmd['image_spec'])
1229 elif prefix == 'add remove':
1230 return self.queue_remove(cmd['image_spec'])
1231 elif prefix == 'add trash remove':
1232 return self.queue_trash_remove(cmd['image_id_spec'])
1233 elif prefix == 'add migration execute':
1234 return self.queue_migration_execute(cmd['image_spec'])
1235 elif prefix == 'add migration commit':
1236 return self.queue_migration_commit(cmd['image_spec'])
1237 elif prefix == 'add migration abort':
1238 return self.queue_migration_abort(cmd['image_spec'])
1239 elif prefix == 'cancel':
1240 return self.task_cancel(cmd['task_id'])
1241 elif prefix == 'list':
1242 return self.task_list(cmd.get('task_id'))
1243
1244 raise NotImplementedError(cmd['prefix'])
1245
1246
1247 class Module(MgrModule):
1248 COMMANDS = [
1249 {
1250 "cmd": "rbd perf image stats "
1251 "name=pool_spec,type=CephString,req=false "
1252 "name=sort_by,type=CephChoices,strings="
1253 "write_ops|write_bytes|write_latency|"
1254 "read_ops|read_bytes|read_latency,"
1255 "req=false ",
1256 "desc": "Retrieve current RBD IO performance stats",
1257 "perm": "r"
1258 },
1259 {
1260 "cmd": "rbd perf image counters "
1261 "name=pool_spec,type=CephString,req=false "
1262 "name=sort_by,type=CephChoices,strings="
1263 "write_ops|write_bytes|write_latency|"
1264 "read_ops|read_bytes|read_latency,"
1265 "req=false ",
1266 "desc": "Retrieve current RBD IO performance counters",
1267 "perm": "r"
1268 },
1269 {
1270 "cmd": "rbd task add flatten "
1271 "name=image_spec,type=CephString",
1272 "desc": "Flatten a cloned image asynchronously in the background",
1273 "perm": "w"
1274 },
1275 {
1276 "cmd": "rbd task add remove "
1277 "name=image_spec,type=CephString",
1278 "desc": "Remove an image asynchronously in the background",
1279 "perm": "w"
1280 },
1281 {
1282 "cmd": "rbd task add trash remove "
1283 "name=image_id_spec,type=CephString",
1284 "desc": "Remove an image from the trash asynchronously in the background",
1285 "perm": "w"
1286 },
1287 {
1288 "cmd": "rbd task add migration execute "
1289 "name=image_spec,type=CephString",
1290 "desc": "Execute an image migration asynchronously in the background",
1291 "perm": "w"
1292 },
1293 {
1294 "cmd": "rbd task add migration commit "
1295 "name=image_spec,type=CephString",
1296 "desc": "Commit an executed migration asynchronously in the background",
1297 "perm": "w"
1298 },
1299 {
1300 "cmd": "rbd task add migration abort "
1301 "name=image_spec,type=CephString",
1302 "desc": "Abort a prepared migration asynchronously in the background",
1303 "perm": "w"
1304 },
1305 {
1306 "cmd": "rbd task cancel "
1307 "name=task_id,type=CephString ",
1308 "desc": "Cancel a pending or running asynchronous task",
1309 "perm": "r"
1310 },
1311 {
1312 "cmd": "rbd task list "
1313 "name=task_id,type=CephString,req=false ",
1314 "desc": "List pending or running asynchronous tasks",
1315 "perm": "r"
1316 }
1317 ]
1318 MODULE_OPTIONS = []
1319
1320 perf = None
1321 task = None
1322
1323 def __init__(self, *args, **kwargs):
1324 super(Module, self).__init__(*args, **kwargs)
1325 self.perf = PerfHandler(self)
1326 self.task = TaskHandler(self)
1327
1328 def handle_command(self, inbuf, cmd):
1329 prefix = cmd['prefix']
1330 try:
1331 try:
1332 if prefix.startswith('rbd perf '):
1333 return self.perf.handle_command(inbuf, prefix[9:], cmd)
1334 elif prefix.startswith('rbd task '):
1335 return self.task.handle_command(inbuf, prefix[9:], cmd)
1336
1337 except Exception as ex:
1338 # log the full traceback but don't send it to the CLI user
1339 self.log.fatal("Fatal runtime error: {}\n{}".format(
1340 ex, traceback.format_exc()))
1341 raise
1342
1343 except rados.Error as ex:
1344 return -ex.errno, "", str(ex)
1345 except rbd.OSError as ex:
1346 return -ex.errno, "", str(ex)
1347 except rbd.Error as ex:
1348 return -errno.EINVAL, "", str(ex)
1349 except KeyError as ex:
1350 return -errno.ENOENT, "", str(ex)
1351 except ValueError as ex:
1352 return -errno.EINVAL, "", str(ex)
1353
1354 raise NotImplementedError(cmd['prefix'])