]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/module.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / pybind / mgr / rbd_support / module.py
1 """
2 RBD support module
3 """
4
5 import errno
6 import json
7 import rados
8 import rbd
9 import re
10 import time
11 import traceback
12 import uuid
13
14 from mgr_module import MgrModule
15
16 from contextlib import contextmanager
17 from datetime import datetime, timedelta
18 from functools import partial, wraps
19 from threading import Condition, Lock, Thread
20
21
22 GLOBAL_POOL_KEY = (None, None)
23
24 QUERY_POOL_ID = "pool_id"
25 QUERY_POOL_ID_MAP = "pool_id_map"
26 QUERY_IDS = "query_ids"
27 QUERY_SUM_POOL_COUNTERS = "pool_counters"
28 QUERY_RAW_POOL_COUNTERS = "raw_pool_counters"
29 QUERY_LAST_REQUEST = "last_request"
30
31 OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$'
32 OSD_PERF_QUERY_COUNTERS = ['write_ops',
33 'read_ops',
34 'write_bytes',
35 'read_bytes',
36 'write_latency',
37 'read_latency']
38 OSD_PERF_QUERY_COUNTERS_INDICES = {
39 OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))}
40
41 OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5]
42 OSD_PERF_QUERY_MAX_RESULTS = 256
43
44 POOL_REFRESH_INTERVAL = timedelta(minutes=5)
45 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
46 STATS_RATE_INTERVAL = timedelta(minutes=1)
47
48 REPORT_MAX_RESULTS = 64
49
50 RBD_TASK_OID = "rbd_task"
51
52 TASK_SEQUENCE = "sequence"
53 TASK_ID = "id"
54 TASK_REFS = "refs"
55 TASK_MESSAGE = "message"
56 TASK_RETRY_TIME = "retry_time"
57 TASK_IN_PROGRESS = "in_progress"
58 TASK_PROGRESS = "progress"
59 TASK_CANCELED = "canceled"
60
61 TASK_REF_POOL_NAME = "pool_name"
62 TASK_REF_POOL_NAMESPACE = "pool_namespace"
63 TASK_REF_IMAGE_NAME = "image_name"
64 TASK_REF_IMAGE_ID = "image_id"
65 TASK_REF_ACTION = "action"
66
67 TASK_REF_ACTION_FLATTEN = "flatten"
68 TASK_REF_ACTION_REMOVE = "remove"
69 TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
70 TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
71 TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
72 TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
73
74 VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
75 TASK_REF_ACTION_REMOVE,
76 TASK_REF_ACTION_TRASH_REMOVE,
77 TASK_REF_ACTION_MIGRATION_EXECUTE,
78 TASK_REF_ACTION_MIGRATION_COMMIT,
79 TASK_REF_ACTION_MIGRATION_ABORT]
80
81 TASK_RETRY_INTERVAL = timedelta(seconds=30)
82 MAX_COMPLETED_TASKS = 50
83
84
85 class NotAuthorizedError(Exception):
86 pass
87
88
89 def is_authorized(module, pool, namespace):
90 return module.is_authorized({"pool": pool or '',
91 "namespace": namespace or ''})
92
93
94 def authorize_request(module, pool, namespace):
95 if not is_authorized(module, pool, namespace):
96 raise NotAuthorizedError("not authorized on pool={}, namespace={}".format(
97 pool, namespace))
98
99
100 def extract_pool_key(pool_spec):
101 if not pool_spec:
102 return GLOBAL_POOL_KEY
103
104 match = re.match(r'^([^/]+)(?:/([^/]+))?$', pool_spec)
105 if not match:
106 raise ValueError("Invalid pool spec: {}".format(pool_spec))
107 return (match.group(1), match.group(2) or '')
108
109
110 def get_rbd_pools(module):
111 osd_map = module.get('osd_map')
112 return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
113 if 'rbd' in pool.get('application_metadata', {})}
114
115
116 class PerfHandler:
117 user_queries = {}
118 image_cache = {}
119
120 lock = Lock()
121 query_condition = Condition(lock)
122 refresh_condition = Condition(lock)
123 thread = None
124
125 image_name_cache = {}
126 image_name_refresh_time = datetime.fromtimestamp(0)
127
128 @classmethod
129 def prepare_regex(cls, value):
130 return '^({})$'.format(value)
131
132 @classmethod
133 def prepare_osd_perf_query(cls, pool_id, namespace, counter_type):
134 pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
135 namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
136 if pool_id:
137 pool_id_regex = cls.prepare_regex(pool_id)
138 if namespace:
139 namespace_regex = cls.prepare_regex(namespace)
140
141 return {
142 'key_descriptor': [
143 {'type': 'pool_id', 'regex': pool_id_regex},
144 {'type': 'namespace', 'regex': namespace_regex},
145 {'type': 'object_name',
146 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
147 ],
148 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS,
149 'limit': {'order_by': counter_type,
150 'max_count': OSD_PERF_QUERY_MAX_RESULTS},
151 }
152
153 @classmethod
154 def pool_spec_search_keys(cls, pool_key):
155 return [pool_key[0:len(pool_key) - x]
156 for x in range(0, len(pool_key) + 1)]
157
158 @classmethod
159 def submatch_pool_key(cls, pool_key, search_key):
160 return ((pool_key[1] == search_key[1] or not search_key[1])
161 and (pool_key[0] == search_key[0] or not search_key[0]))
162
163 def __init__(self, module):
164 self.module = module
165 self.log = module.log
166
167 self.thread = Thread(target=self.run)
168 self.thread.start()
169
170 def run(self):
171 try:
172 self.log.info("PerfHandler: starting")
173 while True:
174 with self.lock:
175 self.scrub_expired_queries()
176 self.process_raw_osd_perf_counters()
177 self.refresh_condition.notify()
178
179 stats_period = int(self.module.get_ceph_option("mgr_stats_period"))
180 self.query_condition.wait(stats_period)
181
182 self.log.debug("PerfHandler: tick")
183
184 except Exception as ex:
185 self.log.fatal("Fatal runtime error: {}\n{}".format(
186 ex, traceback.format_exc()))
187
188 def merge_raw_osd_perf_counters(self, pool_key, query, now_ts,
189 resolve_image_names):
190 pool_id_map = query[QUERY_POOL_ID_MAP]
191
192 # collect and combine the raw counters from all sort orders
193 raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
194 for query_id in query[QUERY_IDS]:
195 res = self.module.get_osd_perf_counters(query_id)
196 for counter in res['counters']:
197 # replace pool id from object name if it exists
198 k = counter['k']
199 pool_id = int(k[2][0]) if k[2][0] else int(k[0][0])
200 namespace = k[1][0]
201 image_id = k[2][1]
202
203 # ignore metrics from non-matching pools/namespaces
204 if pool_id not in pool_id_map:
205 continue
206 if pool_key[1] is not None and pool_key[1] != namespace:
207 continue
208
209 # flag the pool (and namespace) for refresh if we cannot find
210 # image name in the cache
211 resolve_image_key = (pool_id, namespace)
212 if image_id not in self.image_name_cache.get(resolve_image_key, {}):
213 resolve_image_names.add(resolve_image_key)
214
215 # copy the 'sum' counter values for each image (ignore count)
216 # if we haven't already processed it for this round
217 raw_namespaces = raw_pool_counters.setdefault(pool_id, {})
218 raw_images = raw_namespaces.setdefault(namespace, {})
219 raw_image = raw_images.setdefault(image_id, [None, None])
220
221 # save the last two perf counters for each image
222 if raw_image[0] and raw_image[0][0] < now_ts:
223 raw_image[1] = raw_image[0]
224 raw_image[0] = None
225 if not raw_image[0]:
226 raw_image[0] = [now_ts, [int(x[0]) for x in counter['c']]]
227
228 self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters))
229 return raw_pool_counters
230
231 def sum_osd_perf_counters(self, query, raw_pool_counters, now_ts):
232 # update the cumulative counters for each image
233 sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
234 for pool_id, raw_namespaces in raw_pool_counters.items():
235 sum_namespaces = sum_pool_counters.setdefault(pool_id, {})
236 for namespace, raw_images in raw_namespaces.items():
237 sum_namespace = sum_namespaces.setdefault(namespace, {})
238 for image_id, raw_image in raw_images.items():
239 # zero-out non-updated raw counters
240 if not raw_image[0]:
241 continue
242 elif raw_image[0][0] < now_ts:
243 raw_image[1] = raw_image[0]
244 raw_image[0] = [now_ts, [0 for x in raw_image[1][1]]]
245 continue
246
247 counters = raw_image[0][1]
248
249 # copy raw counters if this is a newly discovered image or
250 # increment existing counters
251 sum_image = sum_namespace.setdefault(image_id, None)
252 if sum_image:
253 for i in range(len(counters)):
254 sum_image[i] += counters[i]
255 else:
256 sum_namespace[image_id] = [x for x in counters]
257
258 self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters))
259 return sum_pool_counters
260
261 def refresh_image_names(self, resolve_image_names):
262 for pool_id, namespace in resolve_image_names:
263 image_key = (pool_id, namespace)
264 images = self.image_name_cache.setdefault(image_key, {})
265 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
266 ioctx.set_namespace(namespace)
267 for image_meta in rbd.RBD().list2(ioctx):
268 images[image_meta['id']] = image_meta['name']
269 self.log.debug("resolve_image_names: {}={}".format(image_key, images))
270
271 def scrub_missing_images(self):
272 for pool_key, query in self.user_queries.items():
273 raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {})
274 sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {})
275 for pool_id, sum_namespaces in sum_pool_counters.items():
276 raw_namespaces = raw_pool_counters.get(pool_id, {})
277 for namespace, sum_images in sum_namespaces.items():
278 raw_images = raw_namespaces.get(namespace, {})
279
280 image_key = (pool_id, namespace)
281 image_names = self.image_name_cache.get(image_key, {})
282 for image_id in list(sum_images.keys()):
283 # scrub image counters if we failed to resolve image name
284 if image_id not in image_names:
285 self.log.debug("scrub_missing_images: dropping {}/{}".format(
286 image_key, image_id))
287 del sum_images[image_id]
288 if image_id in raw_images:
289 del raw_images[image_id]
290
291 def process_raw_osd_perf_counters(self):
292 now = datetime.now()
293 now_ts = int(now.strftime("%s"))
294
295 # clear the image name cache if we need to refresh all active pools
296 if self.image_name_cache and \
297 self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now:
298 self.log.debug("process_raw_osd_perf_counters: expiring image name cache")
299 self.image_name_cache = {}
300
301 resolve_image_names = set()
302 for pool_key, query in self.user_queries.items():
303 if not query[QUERY_IDS]:
304 continue
305
306 raw_pool_counters = self.merge_raw_osd_perf_counters(
307 pool_key, query, now_ts, resolve_image_names)
308 self.sum_osd_perf_counters(query, raw_pool_counters, now_ts)
309
310 if resolve_image_names:
311 self.image_name_refresh_time = now
312 self.refresh_image_names(resolve_image_names)
313 self.scrub_missing_images()
314 elif not self.image_name_cache:
315 self.scrub_missing_images()
316
317 def resolve_pool_id(self, pool_name):
318 pool_id = self.module.rados.pool_lookup(pool_name)
319 if not pool_id:
320 raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name),
321 errno.ENOENT)
322 return pool_id
323
324 def scrub_expired_queries(self):
325 # perf counters need to be periodically refreshed to continue
326 # to be registered
327 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
328 for pool_key in list(self.user_queries.keys()):
329 user_query = self.user_queries[pool_key]
330 if user_query[QUERY_LAST_REQUEST] < expire_time:
331 self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
332 del self.user_queries[pool_key]
333
334 def register_osd_perf_queries(self, pool_id, namespace):
335 query_ids = []
336 try:
337 for counter in OSD_PERF_QUERY_COUNTERS:
338 query = self.prepare_osd_perf_query(pool_id, namespace, counter)
339 self.log.debug("register_osd_perf_queries: {}".format(query))
340
341 query_id = self.module.add_osd_perf_query(query)
342 if query_id is None:
343 raise RuntimeError('Failed to add OSD perf query: {}'.format(query))
344 query_ids.append(query_id)
345
346 except Exception:
347 for query_id in query_ids:
348 self.module.remove_osd_perf_query(query_id)
349 raise
350
351 return query_ids
352
353 def unregister_osd_perf_queries(self, pool_key, query_ids):
354 self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
355 pool_key, query_ids))
356 for query_id in query_ids:
357 self.module.remove_osd_perf_query(query_id)
358 query_ids[:] = []
359
360 def register_query(self, pool_key):
361 if pool_key not in self.user_queries:
362 pool_id = None
363 if pool_key[0]:
364 pool_id = self.resolve_pool_id(pool_key[0])
365
366 user_query = {
367 QUERY_POOL_ID: pool_id,
368 QUERY_POOL_ID_MAP: {pool_id: pool_key[0]},
369 QUERY_IDS: self.register_osd_perf_queries(pool_id, pool_key[1]),
370 QUERY_LAST_REQUEST: datetime.now()
371 }
372
373 self.user_queries[pool_key] = user_query
374
375 # force an immediate stat pull if this is a new query
376 self.query_condition.notify()
377 self.refresh_condition.wait(5)
378
379 else:
380 user_query = self.user_queries[pool_key]
381
382 # ensure query doesn't expire
383 user_query[QUERY_LAST_REQUEST] = datetime.now()
384
385 if pool_key == GLOBAL_POOL_KEY:
386 # refresh the global pool id -> name map upon each
387 # processing period
388 user_query[QUERY_POOL_ID_MAP] = {
389 pool_id: pool_name for pool_id, pool_name
390 in get_rbd_pools(self.module).items()}
391
392 self.log.debug("register_query: pool_key={}, query_ids={}".format(
393 pool_key, user_query[QUERY_IDS]))
394
395 return user_query
396
397 def extract_stat(self, index, raw_image, sum_image):
398 # require two raw counters between a fixed time window
399 if not raw_image or not raw_image[0] or not raw_image[1]:
400 return 0
401
402 current_time = raw_image[0][0]
403 previous_time = raw_image[1][0]
404 if current_time <= previous_time or \
405 current_time - previous_time > STATS_RATE_INTERVAL.total_seconds():
406 return 0
407
408 current_value = raw_image[0][1][index]
409 instant_rate = float(current_value) / (current_time - previous_time)
410
411 # convert latencies from sum to average per op
412 ops_index = None
413 if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency':
414 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops']
415 elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency':
416 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops']
417
418 if ops_index is not None:
419 ops = max(1, self.extract_stat(ops_index, raw_image, sum_image))
420 instant_rate /= ops
421
422 return instant_rate
423
424 def extract_counter(self, index, raw_image, sum_image):
425 if sum_image:
426 return sum_image[index]
427 return 0
428
429 def generate_report(self, query, sort_by, extract_data):
430 pool_id_map = query[QUERY_POOL_ID_MAP]
431 sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
432 raw_pool_counters = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
433
434 sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by)
435
436 # pre-sort and limit the response
437 results = []
438 for pool_id, sum_namespaces in sum_pool_counters.items():
439 if pool_id not in pool_id_map:
440 continue
441 raw_namespaces = raw_pool_counters.get(pool_id, {})
442 for namespace, sum_images in sum_namespaces.items():
443 raw_images = raw_namespaces.get(namespace, {})
444 for image_id, sum_image in sum_images.items():
445 raw_image = raw_images.get(image_id, [])
446
447 # always sort by recent IO activity
448 results.append([(pool_id, namespace, image_id),
449 self.extract_stat(sort_by_index, raw_image,
450 sum_image)])
451 results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS]
452
453 # build the report in sorted order
454 pool_descriptors = {}
455 counters = []
456 for key, _ in results:
457 pool_id = key[0]
458 pool_name = pool_id_map[pool_id]
459
460 namespace = key[1]
461 image_id = key[2]
462 image_names = self.image_name_cache.get((pool_id, namespace), {})
463 image_name = image_names[image_id]
464
465 raw_namespaces = raw_pool_counters.get(pool_id, {})
466 raw_images = raw_namespaces.get(namespace, {})
467 raw_image = raw_images.get(image_id, [])
468
469 sum_namespaces = sum_pool_counters[pool_id]
470 sum_images = sum_namespaces[namespace]
471 sum_image = sum_images.get(image_id, [])
472
473 pool_descriptor = pool_name
474 if namespace:
475 pool_descriptor += "/{}".format(namespace)
476 pool_index = pool_descriptors.setdefault(pool_descriptor,
477 len(pool_descriptors))
478 image_descriptor = "{}/{}".format(pool_index, image_name)
479 data = [extract_data(i, raw_image, sum_image)
480 for i in range(len(OSD_PERF_QUERY_COUNTERS))]
481
482 # skip if no data to report
483 if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]:
484 continue
485
486 counters.append({image_descriptor: data})
487
488 return {idx: descriptor for descriptor, idx
489 in pool_descriptors.items()}, \
490 counters
491
492 def get_perf_data(self, report, pool_spec, sort_by, extract_data):
493 self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
494 report, pool_spec, sort_by))
495 self.scrub_expired_queries()
496
497 pool_key = extract_pool_key(pool_spec)
498 authorize_request(self.module, pool_key[0], pool_key[1])
499
500 user_query = self.register_query(pool_key)
501
502 now = datetime.now()
503 pool_descriptors, counters = self.generate_report(
504 user_query, sort_by, extract_data)
505
506 report = {
507 'timestamp': time.mktime(now.timetuple()),
508 '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS,
509 'pool_descriptors': pool_descriptors,
510 '{}s'.format(report): counters
511 }
512
513 return 0, json.dumps(report), ""
514
515 def get_perf_stats(self, pool_spec, sort_by):
516 return self.get_perf_data(
517 "stat", pool_spec, sort_by, self.extract_stat)
518
519 def get_perf_counters(self, pool_spec, sort_by):
520 return self.get_perf_data(
521 "counter", pool_spec, sort_by, self.extract_counter)
522
523 def handle_command(self, inbuf, prefix, cmd):
524 with self.lock:
525 if prefix == 'image stats':
526 return self.get_perf_stats(cmd.get('pool_spec', None),
527 cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
528 elif prefix == 'image counters':
529 return self.get_perf_counters(cmd.get('pool_spec', None),
530 cmd.get('sort_by', OSD_PERF_QUERY_COUNTERS[0]))
531
532 raise NotImplementedError(cmd['prefix'])
533
534
535 class Throttle:
536 def __init__(self, throttle_period):
537 self.throttle_period = throttle_period
538 self.time_of_last_call = datetime.min
539
540 def __call__(self, fn):
541 @wraps(fn)
542 def wrapper(*args, **kwargs):
543 now = datetime.now()
544 if self.time_of_last_call + self.throttle_period <= now:
545 self.time_of_last_call = now
546 return fn(*args, **kwargs)
547 return wrapper
548
549
550 class Task:
551 def __init__(self, sequence, task_id, message, refs):
552 self.sequence = sequence
553 self.task_id = task_id
554 self.message = message
555 self.refs = refs
556 self.retry_time = None
557 self.in_progress = False
558 self.progress = 0.0
559 self.canceled = False
560 self.failed = False
561
562 def __str__(self):
563 return self.to_json()
564
565 @property
566 def sequence_key(self):
567 return "{0:016X}".format(self.sequence)
568
569 def cancel(self):
570 self.canceled = True
571 self.fail("Operation canceled")
572
573 def fail(self, message):
574 self.failed = True
575 self.failure_message = message
576
577 def to_dict(self):
578 d = {TASK_SEQUENCE: self.sequence,
579 TASK_ID: self.task_id,
580 TASK_MESSAGE: self.message,
581 TASK_REFS: self.refs
582 }
583 if self.retry_time:
584 d[TASK_RETRY_TIME] = self.retry_time.isoformat()
585 if self.in_progress:
586 d[TASK_IN_PROGRESS] = True
587 d[TASK_PROGRESS] = self.progress
588 if self.canceled:
589 d[TASK_CANCELED] = True
590 return d
591
592 def to_json(self):
593 return str(json.dumps(self.to_dict()))
594
595 @classmethod
596 def from_json(cls, val):
597 try:
598 d = json.loads(val)
599 action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
600 if action not in VALID_TASK_ACTIONS:
601 raise ValueError("Invalid task action: {}".format(action))
602
603 return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
604 except json.JSONDecodeError as e:
605 raise ValueError("Invalid JSON ({})".format(str(e)))
606 except KeyError as e:
607 raise ValueError("Invalid task format (missing key {})".format(str(e)))
608
609
610 class TaskHandler:
611 lock = Lock()
612 condition = Condition(lock)
613 thread = None
614
615 in_progress_task = None
616 tasks_by_sequence = dict()
617 tasks_by_id = dict()
618
619 completed_tasks = []
620
621 sequence = 0
622
623 def __init__(self, module):
624 self.module = module
625 self.log = module.log
626
627 with self.lock:
628 self.init_task_queue()
629
630 self.thread = Thread(target=self.run)
631 self.thread.start()
632
633 @property
634 def default_pool_name(self):
635 return self.module.get_ceph_option("rbd_default_pool")
636
637 def extract_pool_spec(self, pool_spec):
638 pool_spec = extract_pool_key(pool_spec)
639 if pool_spec == GLOBAL_POOL_KEY:
640 pool_spec = (self.default_pool_name, '')
641 return pool_spec
642
643 def extract_image_spec(self, image_spec):
644 match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
645 image_spec or '')
646 if not match:
647 raise ValueError("Invalid image spec: {}".format(image_spec))
648 return (match.group(1) or self.default_pool_name, match.group(2) or '',
649 match.group(3))
650
651 def run(self):
652 try:
653 self.log.info("TaskHandler: starting")
654 while True:
655 with self.lock:
656 now = datetime.now()
657 for sequence in sorted([sequence for sequence, task
658 in self.tasks_by_sequence.items()
659 if not task.retry_time or task.retry_time <= now]):
660 self.execute_task(sequence)
661
662 self.condition.wait(5)
663 self.log.debug("TaskHandler: tick")
664
665 except Exception as ex:
666 self.log.fatal("Fatal runtime error: {}\n{}".format(
667 ex, traceback.format_exc()))
668
669 @contextmanager
670 def open_ioctx(self, spec):
671 try:
672 with self.module.rados.open_ioctx(spec[0]) as ioctx:
673 ioctx.set_namespace(spec[1])
674 yield ioctx
675 except rados.ObjectNotFound:
676 self.log.error("Failed to locate pool {}".format(spec[0]))
677 raise
678
679 @classmethod
680 def format_image_spec(cls, image_spec):
681 image = image_spec[2]
682 if image_spec[1]:
683 image = "{}/{}".format(image_spec[1], image)
684 if image_spec[0]:
685 image = "{}/{}".format(image_spec[0], image)
686 return image
687
688 def init_task_queue(self):
689 for pool_id, pool_name in get_rbd_pools(self.module).items():
690 try:
691 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
692 self.load_task_queue(ioctx, pool_name)
693
694 try:
695 namespaces = rbd.RBD().namespace_list(ioctx)
696 except rbd.OperationNotSupported:
697 self.log.debug("Namespaces not supported")
698 continue
699
700 for namespace in namespaces:
701 ioctx.set_namespace(namespace)
702 self.load_task_queue(ioctx, pool_name)
703
704 except rados.ObjectNotFound:
705 # pool DNE
706 pass
707
708 if self.tasks_by_sequence:
709 self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
710
711 self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
712 self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
713
714 def load_task_queue(self, ioctx, pool_name):
715 pool_spec = pool_name
716 if ioctx.nspace:
717 pool_spec += "/{}".format(ioctx.nspace)
718
719 start_after = ''
720 try:
721 while True:
722 with rados.ReadOpCtx() as read_op:
723 self.log.info("load_task_task: {}, start_after={}".format(
724 pool_spec, start_after))
725 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
726 ioctx.operate_read_op(read_op, RBD_TASK_OID)
727
728 it = list(it)
729 for k, v in it:
730 start_after = k
731 v = v.decode()
732 self.log.info("load_task_task: task={}".format(v))
733
734 try:
735 task = Task.from_json(v)
736 self.append_task(task)
737 except ValueError:
738 self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
739
740 if not it:
741 break
742
743 except StopIteration:
744 pass
745 except rados.ObjectNotFound:
746 # rbd_task DNE
747 pass
748
749 def append_task(self, task):
750 self.tasks_by_sequence[task.sequence] = task
751 self.tasks_by_id[task.task_id] = task
752
753 def task_refs_match(self, task_refs, refs):
754 if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
755 task_refs = task_refs.copy()
756 del task_refs[TASK_REF_IMAGE_ID]
757
758 self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
759 return task_refs == refs
760
761 def find_task(self, refs):
762 self.log.debug("find_task: refs={}".format(refs))
763
764 # search for dups and return the original
765 for task_id in reversed(sorted(self.tasks_by_id.keys())):
766 task = self.tasks_by_id[task_id]
767 if self.task_refs_match(task.refs, refs):
768 return task
769
770 # search for a completed task (message replay)
771 for task in reversed(self.completed_tasks):
772 if self.task_refs_match(task.refs, refs):
773 return task
774
775 def add_task(self, ioctx, message, refs):
776 self.log.debug("add_task: message={}, refs={}".format(message, refs))
777
778 # ensure unique uuid across all pools
779 while True:
780 task_id = str(uuid.uuid4())
781 if task_id not in self.tasks_by_id:
782 break
783
784 self.sequence += 1
785 task = Task(self.sequence, task_id, message, refs)
786
787 # add the task to the rbd_task omap
788 task_json = task.to_json()
789 omap_keys = (task.sequence_key, )
790 omap_vals = (str.encode(task_json), )
791 self.log.info("adding task: {} {}".format(omap_keys[0], omap_vals[0]))
792
793 with rados.WriteOpCtx() as write_op:
794 ioctx.set_omap(write_op, omap_keys, omap_vals)
795 ioctx.operate_write_op(write_op, RBD_TASK_OID)
796 self.append_task(task)
797
798 self.condition.notify()
799 return task_json
800
801 def remove_task(self, ioctx, task, remove_in_memory=True):
802 self.log.info("remove_task: task={}".format(str(task)))
803 omap_keys = (task.sequence_key, )
804 try:
805 with rados.WriteOpCtx() as write_op:
806 ioctx.remove_omap_keys(write_op, omap_keys)
807 ioctx.operate_write_op(write_op, RBD_TASK_OID)
808 except rados.ObjectNotFound:
809 pass
810
811 if remove_in_memory:
812 try:
813 del self.tasks_by_id[task.task_id]
814 del self.tasks_by_sequence[task.sequence]
815
816 # keep a record of the last N tasks to help avoid command replay
817 # races
818 if not task.failed and not task.canceled:
819 self.log.debug("remove_task: moving to completed tasks")
820 self.completed_tasks.append(task)
821 self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
822
823 except KeyError:
824 pass
825
826 def execute_task(self, sequence):
827 task = self.tasks_by_sequence[sequence]
828 self.log.info("execute_task: task={}".format(str(task)))
829
830 pool_valid = False
831 try:
832 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
833 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
834 pool_valid = True
835
836 action = task.refs[TASK_REF_ACTION]
837 execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
838 TASK_REF_ACTION_REMOVE: self.execute_remove,
839 TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
840 TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
841 TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
842 TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
843 }.get(action)
844 if not execute_fn:
845 self.log.error("Invalid task action: {}".format(action))
846 else:
847 task.in_progress = True
848 self.in_progress_task = task
849 self.update_progress(task, 0)
850
851 self.lock.release()
852 try:
853 execute_fn(ioctx, task)
854
855 except rbd.OperationCanceled:
856 self.log.info("Operation canceled: task={}".format(
857 str(task)))
858
859 finally:
860 self.lock.acquire()
861
862 task.in_progress = False
863 self.in_progress_task = None
864
865 self.complete_progress(task)
866 self.remove_task(ioctx, task)
867
868 except rados.ObjectNotFound as e:
869 self.log.error("execute_task: {}".format(e))
870 if pool_valid:
871 self.update_progress(task, 0)
872 else:
873 # pool DNE -- remove the task
874 self.complete_progress(task)
875 self.remove_task(ioctx, task)
876
877 except (rados.Error, rbd.Error) as e:
878 self.log.error("execute_task: {}".format(e))
879 self.update_progress(task, 0)
880
881 finally:
882 task.in_progress = False
883 task.retry_time = datetime.now() + TASK_RETRY_INTERVAL
884
885 def progress_callback(self, task, current, total):
886 progress = float(current) / float(total)
887 self.log.debug("progress_callback: task={}, progress={}".format(
888 str(task), progress))
889
890 # avoid deadlocking when a new command comes in during a progress callback
891 if not self.lock.acquire(False):
892 return 0
893
894 try:
895 if not self.in_progress_task or self.in_progress_task.canceled:
896 return -rbd.ECANCELED
897 self.in_progress_task.progress = progress
898 finally:
899 self.lock.release()
900
901 self.throttled_update_progress(task, progress)
902 return 0
903
904 def execute_flatten(self, ioctx, task):
905 self.log.info("execute_flatten: task={}".format(str(task)))
906
907 try:
908 with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
909 image.flatten(on_progress=partial(self.progress_callback, task))
910 except rbd.InvalidArgument:
911 task.fail("Image does not have parent")
912 self.log.info("{}: task={}".format(task.failure_message, str(task)))
913 except rbd.ImageNotFound:
914 task.fail("Image does not exist")
915 self.log.info("{}: task={}".format(task.failure_message, str(task)))
916
917 def execute_remove(self, ioctx, task):
918 self.log.info("execute_remove: task={}".format(str(task)))
919
920 try:
921 rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
922 on_progress=partial(self.progress_callback, task))
923 except rbd.ImageNotFound:
924 task.fail("Image does not exist")
925 self.log.info("{}: task={}".format(task.failure_message, str(task)))
926
927 def execute_trash_remove(self, ioctx, task):
928 self.log.info("execute_trash_remove: task={}".format(str(task)))
929
930 try:
931 rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
932 on_progress=partial(self.progress_callback, task))
933 except rbd.ImageNotFound:
934 task.fail("Image does not exist")
935 self.log.info("{}: task={}".format(task.failure_message, str(task)))
936
937 def execute_migration_execute(self, ioctx, task):
938 self.log.info("execute_migration_execute: task={}".format(str(task)))
939
940 try:
941 rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
942 on_progress=partial(self.progress_callback, task))
943 except rbd.ImageNotFound:
944 task.fail("Image does not exist")
945 self.log.info("{}: task={}".format(task.failure_message, str(task)))
946 except rbd.InvalidArgument:
947 task.fail("Image is not migrating")
948 self.log.info("{}: task={}".format(task.failure_message, str(task)))
949
950 def execute_migration_commit(self, ioctx, task):
951 self.log.info("execute_migration_commit: task={}".format(str(task)))
952
953 try:
954 rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
955 on_progress=partial(self.progress_callback, task))
956 except rbd.ImageNotFound:
957 task.fail("Image does not exist")
958 self.log.info("{}: task={}".format(task.failure_message, str(task)))
959 except rbd.InvalidArgument:
960 task.fail("Image is not migrating or migration not executed")
961 self.log.info("{}: task={}".format(task.failure_message, str(task)))
962
963 def execute_migration_abort(self, ioctx, task):
964 self.log.info("execute_migration_abort: task={}".format(str(task)))
965
966 try:
967 rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
968 on_progress=partial(self.progress_callback, task))
969 except rbd.ImageNotFound:
970 task.fail("Image does not exist")
971 self.log.info("{}: task={}".format(task.failure_message, str(task)))
972 except rbd.InvalidArgument:
973 task.fail("Image is not migrating")
974 self.log.info("{}: task={}".format(task.failure_message, str(task)))
975
976 def complete_progress(self, task):
977 self.log.debug("complete_progress: task={}".format(str(task)))
978 try:
979 if task.failed:
980 self.module.remote("progress", "fail", task.task_id,
981 task.failure_message)
982 else:
983 self.module.remote("progress", "complete", task.task_id)
984 except ImportError:
985 # progress module is disabled
986 pass
987
988 def update_progress(self, task, progress):
989 self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
990 try:
991 refs = {"origin": "rbd_support"}
992 refs.update(task.refs)
993
994 self.module.remote("progress", "update", task.task_id,
995 task.message, progress, refs)
996 except ImportError:
997 # progress module is disabled
998 pass
999
1000 @Throttle(timedelta(seconds=1))
1001 def throttled_update_progress(self, task, progress):
1002 self.update_progress(task, progress)
1003
1004 def queue_flatten(self, image_spec):
1005 image_spec = self.extract_image_spec(image_spec)
1006
1007 authorize_request(self.module, image_spec[0], image_spec[1])
1008 self.log.info("queue_flatten: {}".format(image_spec))
1009
1010 refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
1011 TASK_REF_POOL_NAME: image_spec[0],
1012 TASK_REF_POOL_NAMESPACE: image_spec[1],
1013 TASK_REF_IMAGE_NAME: image_spec[2]}
1014
1015 with self.open_ioctx(image_spec) as ioctx:
1016 try:
1017 with rbd.Image(ioctx, image_spec[2]) as image:
1018 refs[TASK_REF_IMAGE_ID] = image.id()
1019
1020 try:
1021 parent_image_id = image.parent_id()
1022 except rbd.ImageNotFound:
1023 parent_image_id = None
1024
1025 except rbd.ImageNotFound:
1026 pass
1027
1028 task = self.find_task(refs)
1029 if task:
1030 return 0, task.to_json(), ''
1031
1032 if TASK_REF_IMAGE_ID not in refs:
1033 raise rbd.ImageNotFound("Image {} does not exist".format(
1034 self.format_image_spec(image_spec)), errno=errno.ENOENT)
1035 if not parent_image_id:
1036 raise rbd.ImageNotFound("Image {} does not have a parent".format(
1037 self.format_image_spec(image_spec)), errno=errno.ENOENT)
1038
1039 return 0, self.add_task(ioctx,
1040 "Flattening image {}".format(
1041 self.format_image_spec(image_spec)),
1042 refs), ""
1043
1044 def queue_remove(self, image_spec):
1045 image_spec = self.extract_image_spec(image_spec)
1046
1047 authorize_request(self.module, image_spec[0], image_spec[1])
1048 self.log.info("queue_remove: {}".format(image_spec))
1049
1050 refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
1051 TASK_REF_POOL_NAME: image_spec[0],
1052 TASK_REF_POOL_NAMESPACE: image_spec[1],
1053 TASK_REF_IMAGE_NAME: image_spec[2]}
1054
1055 with self.open_ioctx(image_spec) as ioctx:
1056 try:
1057 with rbd.Image(ioctx, image_spec[2]) as image:
1058 refs[TASK_REF_IMAGE_ID] = image.id()
1059 snaps = list(image.list_snaps())
1060
1061 except rbd.ImageNotFound:
1062 pass
1063
1064 task = self.find_task(refs)
1065 if task:
1066 return 0, task.to_json(), ''
1067
1068 if TASK_REF_IMAGE_ID not in refs:
1069 raise rbd.ImageNotFound("Image {} does not exist".format(
1070 self.format_image_spec(image_spec)), errno=errno.ENOENT)
1071 if snaps:
1072 raise rbd.ImageBusy("Image {} has snapshots".format(
1073 self.format_image_spec(image_spec)), errno=errno.EBUSY)
1074
1075 return 0, self.add_task(ioctx,
1076 "Removing image {}".format(
1077 self.format_image_spec(image_spec)),
1078 refs), ''
1079
1080 def queue_trash_remove(self, image_id_spec):
1081 image_id_spec = self.extract_image_spec(image_id_spec)
1082
1083 authorize_request(self.module, image_id_spec[0], image_id_spec[1])
1084 self.log.info("queue_trash_remove: {}".format(image_id_spec))
1085
1086 refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
1087 TASK_REF_POOL_NAME: image_id_spec[0],
1088 TASK_REF_POOL_NAMESPACE: image_id_spec[1],
1089 TASK_REF_IMAGE_ID: image_id_spec[2]}
1090 task = self.find_task(refs)
1091 if task:
1092 return 0, task.to_json(), ''
1093
1094 # verify that image exists in trash
1095 with self.open_ioctx(image_id_spec) as ioctx:
1096 rbd.RBD().trash_get(ioctx, image_id_spec[2])
1097
1098 return 0, self.add_task(ioctx,
1099 "Removing image {} from trash".format(
1100 self.format_image_spec(image_id_spec)),
1101 refs), ''
1102
1103 def get_migration_status(self, ioctx, image_spec):
1104 try:
1105 return rbd.RBD().migration_status(ioctx, image_spec[2])
1106 except (rbd.InvalidArgument, rbd.ImageNotFound):
1107 return None
1108
1109 def validate_image_migrating(self, image_spec, migration_status):
1110 if not migration_status:
1111 raise rbd.InvalidArgument("Image {} is not migrating".format(
1112 self.format_image_spec(image_spec)), errno=errno.EINVAL)
1113
1114 def resolve_pool_name(self, pool_id):
1115 osd_map = self.module.get('osd_map')
1116 for pool in osd_map['pools']:
1117 if pool['pool'] == pool_id:
1118 return pool['pool_name']
1119 return '<unknown>'
1120
1121 def queue_migration_execute(self, image_spec):
1122 image_spec = self.extract_image_spec(image_spec)
1123
1124 authorize_request(self.module, image_spec[0], image_spec[1])
1125 self.log.info("queue_migration_execute: {}".format(image_spec))
1126
1127 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
1128 TASK_REF_POOL_NAME: image_spec[0],
1129 TASK_REF_POOL_NAMESPACE: image_spec[1],
1130 TASK_REF_IMAGE_NAME: image_spec[2]}
1131
1132 with self.open_ioctx(image_spec) as ioctx:
1133 status = self.get_migration_status(ioctx, image_spec)
1134 if status:
1135 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
1136
1137 task = self.find_task(refs)
1138 if task:
1139 return 0, task.to_json(), ''
1140
1141 self.validate_image_migrating(image_spec, status)
1142 if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
1143 rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
1144 raise rbd.InvalidArgument("Image {} is not in ready state".format(
1145 self.format_image_spec(image_spec)), errno=errno.EINVAL)
1146
1147 source_pool = self.resolve_pool_name(status['source_pool_id'])
1148 dest_pool = self.resolve_pool_name(status['dest_pool_id'])
1149 return 0, self.add_task(ioctx,
1150 "Migrating image {} to {}".format(
1151 self.format_image_spec((source_pool,
1152 status['source_pool_namespace'],
1153 status['source_image_name'])),
1154 self.format_image_spec((dest_pool,
1155 status['dest_pool_namespace'],
1156 status['dest_image_name']))),
1157 refs), ''
1158
1159 def queue_migration_commit(self, image_spec):
1160 image_spec = self.extract_image_spec(image_spec)
1161
1162 authorize_request(self.module, image_spec[0], image_spec[1])
1163 self.log.info("queue_migration_commit: {}".format(image_spec))
1164
1165 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
1166 TASK_REF_POOL_NAME: image_spec[0],
1167 TASK_REF_POOL_NAMESPACE: image_spec[1],
1168 TASK_REF_IMAGE_NAME: image_spec[2]}
1169
1170 with self.open_ioctx(image_spec) as ioctx:
1171 status = self.get_migration_status(ioctx, image_spec)
1172 if status:
1173 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
1174
1175 task = self.find_task(refs)
1176 if task:
1177 return 0, task.to_json(), ''
1178
1179 self.validate_image_migrating(image_spec, status)
1180 if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
1181 raise rbd.InvalidArgument("Image {} has not completed migration".format(
1182 self.format_image_spec(image_spec)), errno=errno.EINVAL)
1183
1184 return 0, self.add_task(ioctx,
1185 "Committing image migration for {}".format(
1186 self.format_image_spec(image_spec)),
1187 refs), ''
1188
1189 def queue_migration_abort(self, image_spec):
1190 image_spec = self.extract_image_spec(image_spec)
1191
1192 authorize_request(self.module, image_spec[0], image_spec[1])
1193 self.log.info("queue_migration_abort: {}".format(image_spec))
1194
1195 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
1196 TASK_REF_POOL_NAME: image_spec[0],
1197 TASK_REF_POOL_NAMESPACE: image_spec[1],
1198 TASK_REF_IMAGE_NAME: image_spec[2]}
1199
1200 with self.open_ioctx(image_spec) as ioctx:
1201 status = self.get_migration_status(ioctx, image_spec)
1202 if status:
1203 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
1204
1205 task = self.find_task(refs)
1206 if task:
1207 return 0, task.to_json(), ''
1208
1209 self.validate_image_migrating(image_spec, status)
1210 return 0, self.add_task(ioctx,
1211 "Aborting image migration for {}".format(
1212 self.format_image_spec(image_spec)),
1213 refs), ''
1214
1215 def task_cancel(self, task_id):
1216 self.log.info("task_cancel: {}".format(task_id))
1217
1218 task = self.tasks_by_id.get(task_id)
1219 if not task or not is_authorized(self.module,
1220 task.refs[TASK_REF_POOL_NAME],
1221 task.refs[TASK_REF_POOL_NAMESPACE]):
1222 return -errno.ENOENT, '', "No such task {}".format(task_id)
1223
1224 task.cancel()
1225
1226 remove_in_memory = True
1227 if self.in_progress_task and self.in_progress_task.task_id == task_id:
1228 self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
1229 remove_in_memory = False
1230
1231 # complete any associated event in the progress module
1232 self.complete_progress(task)
1233
1234 # remove from rbd_task omap
1235 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
1236 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
1237 self.remove_task(ioctx, task, remove_in_memory)
1238
1239 return 0, "", ""
1240
1241 def task_list(self, task_id):
1242 self.log.info("task_list: {}".format(task_id))
1243
1244 if task_id:
1245 task = self.tasks_by_id.get(task_id)
1246 if not task or not is_authorized(self.module,
1247 task.refs[TASK_REF_POOL_NAME],
1248 task.refs[TASK_REF_POOL_NAMESPACE]):
1249 return -errno.ENOENT, '', "No such task {}".format(task_id)
1250
1251 result = task.to_dict()
1252 else:
1253 result = []
1254 for sequence in sorted(self.tasks_by_sequence.keys()):
1255 task = self.tasks_by_sequence[sequence]
1256 if is_authorized(self.module,
1257 task.refs[TASK_REF_POOL_NAME],
1258 task.refs[TASK_REF_POOL_NAMESPACE]):
1259 result.append(task.to_dict())
1260
1261 return 0, json.dumps(result), ""
1262
1263 def handle_command(self, inbuf, prefix, cmd):
1264 with self.lock:
1265 if prefix == 'add flatten':
1266 return self.queue_flatten(cmd['image_spec'])
1267 elif prefix == 'add remove':
1268 return self.queue_remove(cmd['image_spec'])
1269 elif prefix == 'add trash remove':
1270 return self.queue_trash_remove(cmd['image_id_spec'])
1271 elif prefix == 'add migration execute':
1272 return self.queue_migration_execute(cmd['image_spec'])
1273 elif prefix == 'add migration commit':
1274 return self.queue_migration_commit(cmd['image_spec'])
1275 elif prefix == 'add migration abort':
1276 return self.queue_migration_abort(cmd['image_spec'])
1277 elif prefix == 'cancel':
1278 return self.task_cancel(cmd['task_id'])
1279 elif prefix == 'list':
1280 return self.task_list(cmd.get('task_id'))
1281
1282 raise NotImplementedError(cmd['prefix'])
1283
1284
1285 class Module(MgrModule):
1286 COMMANDS = [
1287 {
1288 "cmd": "rbd perf image stats "
1289 "name=pool_spec,type=CephString,req=false "
1290 "name=sort_by,type=CephChoices,strings="
1291 "write_ops|write_bytes|write_latency|"
1292 "read_ops|read_bytes|read_latency,"
1293 "req=false ",
1294 "desc": "Retrieve current RBD IO performance stats",
1295 "perm": "r"
1296 },
1297 {
1298 "cmd": "rbd perf image counters "
1299 "name=pool_spec,type=CephString,req=false "
1300 "name=sort_by,type=CephChoices,strings="
1301 "write_ops|write_bytes|write_latency|"
1302 "read_ops|read_bytes|read_latency,"
1303 "req=false ",
1304 "desc": "Retrieve current RBD IO performance counters",
1305 "perm": "r"
1306 },
1307 {
1308 "cmd": "rbd task add flatten "
1309 "name=image_spec,type=CephString",
1310 "desc": "Flatten a cloned image asynchronously in the background",
1311 "perm": "w"
1312 },
1313 {
1314 "cmd": "rbd task add remove "
1315 "name=image_spec,type=CephString",
1316 "desc": "Remove an image asynchronously in the background",
1317 "perm": "w"
1318 },
1319 {
1320 "cmd": "rbd task add trash remove "
1321 "name=image_id_spec,type=CephString",
1322 "desc": "Remove an image from the trash asynchronously in the background",
1323 "perm": "w"
1324 },
1325 {
1326 "cmd": "rbd task add migration execute "
1327 "name=image_spec,type=CephString",
1328 "desc": "Execute an image migration asynchronously in the background",
1329 "perm": "w"
1330 },
1331 {
1332 "cmd": "rbd task add migration commit "
1333 "name=image_spec,type=CephString",
1334 "desc": "Commit an executed migration asynchronously in the background",
1335 "perm": "w"
1336 },
1337 {
1338 "cmd": "rbd task add migration abort "
1339 "name=image_spec,type=CephString",
1340 "desc": "Abort a prepared migration asynchronously in the background",
1341 "perm": "w"
1342 },
1343 {
1344 "cmd": "rbd task cancel "
1345 "name=task_id,type=CephString ",
1346 "desc": "Cancel a pending or running asynchronous task",
1347 "perm": "r"
1348 },
1349 {
1350 "cmd": "rbd task list "
1351 "name=task_id,type=CephString,req=false ",
1352 "desc": "List pending or running asynchronous tasks",
1353 "perm": "r"
1354 }
1355 ]
1356 MODULE_OPTIONS = []
1357
1358 perf = None
1359 task = None
1360
1361 def __init__(self, *args, **kwargs):
1362 super(Module, self).__init__(*args, **kwargs)
1363 self.perf = PerfHandler(self)
1364 self.task = TaskHandler(self)
1365
1366 def handle_command(self, inbuf, cmd):
1367 prefix = cmd['prefix']
1368 try:
1369 try:
1370 if prefix.startswith('rbd perf '):
1371 return self.perf.handle_command(inbuf, prefix[9:], cmd)
1372 elif prefix.startswith('rbd task '):
1373 return self.task.handle_command(inbuf, prefix[9:], cmd)
1374
1375 except NotAuthorizedError:
1376 raise
1377 except Exception as ex:
1378 # log the full traceback but don't send it to the CLI user
1379 self.log.fatal("Fatal runtime error: {}\n{}".format(
1380 ex, traceback.format_exc()))
1381 raise
1382
1383 except rados.Error as ex:
1384 return -ex.errno, "", str(ex)
1385 except rbd.OSError as ex:
1386 return -ex.errno, "", str(ex)
1387 except rbd.Error as ex:
1388 return -errno.EINVAL, "", str(ex)
1389 except KeyError as ex:
1390 return -errno.ENOENT, "", str(ex)
1391 except ValueError as ex:
1392 return -errno.EINVAL, "", str(ex)
1393 except NotAuthorizedError as ex:
1394 return -errno.EACCES, "", str(ex)
1395
1396 raise NotImplementedError(cmd['prefix'])